summaryrefslogtreecommitdiffstats
path: root/sys/netgraph/netflow/netflow.c
diff options
context:
space:
mode:
authorglebius <glebius@FreeBSD.org>2005-05-11 11:26:24 +0000
committerglebius <glebius@FreeBSD.org>2005-05-11 11:26:24 +0000
commit8455545285d4f5cb42b42bade7aceafc22d1cd59 (patch)
treea20a65ec59042108874f8178d54da871a4d1f80b /sys/netgraph/netflow/netflow.c
parent6413eea55ab7218f882f99d922684166d739e1f4 (diff)
downloadFreeBSD-src-8455545285d4f5cb42b42bade7aceafc22d1cd59.zip
FreeBSD-src-8455545285d4f5cb42b42bade7aceafc22d1cd59.tar.gz
A new version of NetFlow node.
The most significant changes are: - Use UMA zone instead of own chunk of memory. - Lock each hash entry separately. - Expire items "actively" - interrupt method can expire flows from hash slot, when it searches through it. - Remove global tailqueue. Make callout thread search through every hash slot. - Export datagram is detached from private data and filled. If it is incomplete, it is attached back. Another thread will continue working with it. Lesser, but also important speedups: - Flows in hash slot are stored in tailqueue. Whenever a flow is hit, it is moved to the begging, so it can be located quicker. - When callout thread works with hash slot it bails out if slot mutex is contested.
Diffstat (limited to 'sys/netgraph/netflow/netflow.c')
-rw-r--r--sys/netgraph/netflow/netflow.c562
1 files changed, 293 insertions, 269 deletions
diff --git a/sys/netgraph/netflow/netflow.c b/sys/netgraph/netflow/netflow.c
index 296849d4..4ed9e19 100644
--- a/sys/netgraph/netflow/netflow.c
+++ b/sys/netgraph/netflow/netflow.c
@@ -38,9 +38,9 @@ static const char rcs_id[] =
#include <sys/systm.h>
#include <sys/socket.h>
+#include <machine/atomic.h>
+
#include <net/if.h>
-#include <net/if_var.h>
-#include <net/if_dl.h>
#include <net/route.h>
#include <netinet/in.h>
#include <netinet/in_systm.h>
@@ -54,7 +54,7 @@ static const char rcs_id[] =
#include <netgraph/netflow/netflow.h>
#include <netgraph/netflow/ng_netflow.h>
-#define NBUCKETS (4096) /* must be power of 2 */
+#define NBUCKETS (65536) /* must be power of 2 */
/* This hash is for TCP or UDP packets */
#define FULL_HASH(addr1,addr2,port1,port2)\
@@ -63,7 +63,7 @@ static const char rcs_id[] =
((port1 ^ port2) << 8) )& \
(NBUCKETS - 1))
-/* This hash for all other IP packets */
+/* This hash is for all other IP packets */
#define ADDR_HASH(addr1,addr2)\
(((addr1 >> 16) ^ \
(addr2 & 0x00FF) )& \
@@ -95,13 +95,13 @@ static const char rcs_id[] =
((t) << 5) + /* 32 */ \
((t) << 3)) /* 8 */
-MALLOC_DECLARE(M_NETFLOW);
-MALLOC_DEFINE(M_NETFLOW, "NetFlow", "flow cache");
+MALLOC_DECLARE(M_NETFLOW_HASH);
+MALLOC_DEFINE(M_NETFLOW_HASH, "NetFlow hash", "NetFlow hash");
-static int export_add(priv_p , struct flow_entry *);
-static int export_send(priv_p );
+static int export_add(item_p, struct flow_entry *);
+static int export_send(priv_p, item_p);
-/* Generate hash for a given flow record */
+/* Generate hash for a given flow record. */
static __inline uint32_t
ip_hash(struct flow_rec *r)
{
@@ -115,88 +115,113 @@ ip_hash(struct flow_rec *r)
}
}
-/* Lookup for record in given slot */
-static __inline struct flow_entry *
-hash_lookup(struct flow_hash_entry *h, int slot, struct flow_rec *r)
+/* This is callback from uma(9), called on alloc. */
+static int
+uma_ctor_flow(void *mem, int size, void *arg, int how)
{
- struct flow_entry *fle;
+ priv_p priv = (priv_p )arg;
+
+ if (atomic_load_acq_32(&priv->info.nfinfo_used) >= CACHESIZE)
+ return (ENOMEM);
- LIST_FOREACH(fle, &(h[slot].head), fle_hash)
- if (bcmp(r, &fle->f.r, sizeof(struct flow_rec)) == 0)
- return (fle);
+ atomic_add_32(&priv->info.nfinfo_used, 1);
- return (NULL);
+ return (0);
}
-/* Get a flow entry from free list */
-static __inline struct flow_entry *
-alloc_flow(priv_p priv, int *flows)
+/* This is callback from uma(9), called on free. */
+static void
+uma_dtor_flow(void *mem, int size, void *arg)
{
- register struct flow_entry *fle;
-
- mtx_lock(&priv->free_mtx);
-
- if (SLIST_EMPTY(&priv->free_list)) {
- mtx_unlock(&priv->free_mtx);
- return(NULL);
- }
+ priv_p priv = (priv_p )arg;
- fle = SLIST_FIRST(&priv->free_list);
- SLIST_REMOVE_HEAD(&priv->free_list, fle_free);
+ atomic_subtract_32(&priv->info.nfinfo_used, 1);
+}
- priv->info.nfinfo_used++;
- priv->info.nfinfo_free--;
+/*
+ * Detach export datagram from priv, if there is any.
+ * If there is no, allocate a new one.
+ */
+static item_p
+get_export_dgram(priv_p priv)
+{
+ item_p item = NULL;
- if (flows != NULL)
- *flows = priv->info.nfinfo_used;
+ mtx_lock(&priv->export_mtx);
+ if (priv->export_item != NULL) {
+ item = priv->export_item;
+ priv->export_item = NULL;
+ }
+ mtx_unlock(&priv->export_mtx);
+
+ if (item == NULL) {
+ struct netflow_v5_export_dgram *dgram;
+ struct mbuf *m;
+
+ m = m_getcl(M_DONTWAIT, MT_DATA, M_PKTHDR);
+ if (m == NULL)
+ return (NULL);
+ item = ng_package_data(m, NULL);
+ if (item == NULL)
+ return (NULL);
+ dgram = mtod(m, struct netflow_v5_export_dgram *);
+ dgram->header.count = 0;
+ dgram->header.version = htons(NETFLOW_V5);
- mtx_unlock(&priv->free_mtx);
+ }
- return (fle);
+ return (item);
}
-/* Insert flow entry into a free list. */
-static __inline int
-free_flow(priv_p priv, struct flow_entry *fle)
+/*
+ * Re-attach incomplete datagram back to priv.
+ * If there is already another one, then send incomplete. */
+static void
+return_export_dgram(priv_p priv, item_p item)
{
- int flows;
-
- mtx_lock(&priv->free_mtx);
- fle->f.packets = 0;
- SLIST_INSERT_HEAD(&priv->free_list, fle, fle_free);
- flows = priv->info.nfinfo_used--;
- priv->info.nfinfo_free++;
- mtx_unlock(&priv->free_mtx);
-
- return flows;
+ /*
+ * It may happen on SMP, that some thread has already
+ * put its item there, in this case we bail out and
+ * send what we have to collector.
+ */
+ mtx_lock(&priv->export_mtx);
+ if (priv->export_item == NULL) {
+ priv->export_item = item;
+ mtx_unlock(&priv->export_mtx);
+ } else {
+ mtx_unlock(&priv->export_mtx);
+ export_send(priv, item);
+ }
}
-#define NGNF_GETUSED(priv, rval) do { \
- mtx_lock(&priv->free_mtx); \
- rval = priv->info.nfinfo_used; \
- mtx_unlock(&priv->free_mtx); \
- } while (0)
-
-/* Insert flow entry into expire list. */
-/* XXX: Flow must be detached from work queue, but not from cache */
+/*
+ * The flow is over. Call export_add() and free it. If datagram is
+ * full, then call export_send().
+ */
static __inline void
-expire_flow(priv_p priv, struct flow_entry *fle)
+expire_flow(priv_p priv, item_p *item, struct flow_entry *fle)
{
- mtx_assert(&priv->work_mtx, MA_OWNED);
- LIST_REMOVE(fle, fle_hash);
-
- mtx_lock(&priv->expire_mtx);
- SLIST_INSERT_HEAD(&priv->expire_list, fle, fle_free);
- mtx_unlock(&priv->expire_mtx);
+ if (*item == NULL)
+ *item = get_export_dgram(priv);
+ if (*item == NULL) {
+ /* XXX: do stats! */
+ log(LOG_DEBUG, "get_export_dgram failed\n");
+ uma_zfree_arg(priv->zone, fle, priv);
+ return;
+ }
+ if (export_add(*item, fle) > 0) {
+ export_send(priv, *item);
+ *item = NULL;
+ }
+ uma_zfree_arg(priv->zone, fle, priv);
}
/* Get a snapshot of node statistics */
void
ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i)
{
- mtx_lock(&priv->free_mtx);
+ /* XXX: atomic */
memcpy((void *)i, (void *)&priv->info, sizeof(priv->info));
- mtx_unlock(&priv->free_mtx);
}
/* Calculate number of bits in netmask */
@@ -216,21 +241,27 @@ bit_count(uint32_t v)
* Insert a record into defined slot.
*
* First we get for us a free flow entry, then fill in all
- * possible fields in it. Then obtain lock on flow cache
- * and insert flow entry.
+ * possible fields in it.
+ *
+ * TODO: consider dropping hash mutex while filling in datagram,
+ * as this was done in previous version. Need to test & profile
+ * to be sure.
*/
static __inline int
-hash_insert(priv_p priv, int slot, struct flow_rec *r, int plen,
- uint8_t tcp_flags)
+hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r,
+ int plen, uint8_t tcp_flags)
{
- struct flow_hash_entry *h = priv->hash;
struct flow_entry *fle;
struct route ro;
struct sockaddr_in *sin;
- fle = alloc_flow(priv, NULL);
- if (fle == NULL)
+ mtx_assert(&hsh->mtx, MA_OWNED);
+
+ fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT);
+ if (fle == NULL) {
+ atomic_add_32(&priv->info.nfinfo_failed, 1);
return (ENOMEM);
+ }
/*
* Now fle is totally ours. It is detached from all lists,
@@ -295,11 +326,8 @@ hash_insert(priv_p priv, int slot, struct flow_rec *r, int plen,
RTFREE(ro.ro_rt);
}
- /* Push new flow entry into flow cache */
- mtx_lock(&priv->work_mtx);
- LIST_INSERT_HEAD(&(h[slot].head), fle, fle_hash);
- TAILQ_INSERT_TAIL(&priv->work_queue, fle, fle_work);
- mtx_unlock(&priv->work_mtx);
+ /* Push new flow at the and of hash. */
+ TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
return (0);
}
@@ -313,40 +341,31 @@ hash_insert(priv_p priv, int slot, struct flow_rec *r, int plen,
int
ng_netflow_cache_init(priv_p priv)
{
- struct flow_entry *fle;
+ struct flow_hash_entry *hsh;
int i;
- /* allocate cache */
- MALLOC(priv->cache, struct flow_entry *,
- CACHESIZE * sizeof(struct flow_entry),
- M_NETFLOW, M_WAITOK | M_ZERO);
-
- if (priv->cache == NULL)
- return (ENOMEM);
+ /* Initialize cache UMA zone. */
+ priv->zone = uma_zcreate("NetFlow cache", sizeof(struct flow_entry),
+ uma_ctor_flow, uma_dtor_flow, NULL, NULL, UMA_ALIGN_CACHE, 0);
+ uma_zone_set_max(priv->zone, CACHESIZE);
- /* allocate hash */
+ /* Allocate hash. */
MALLOC(priv->hash, struct flow_hash_entry *,
NBUCKETS * sizeof(struct flow_hash_entry),
- M_NETFLOW, M_WAITOK | M_ZERO);
+ M_NETFLOW_HASH, M_WAITOK | M_ZERO);
if (priv->hash == NULL) {
- FREE(priv->cache, M_NETFLOW);
+ uma_zdestroy(priv->zone);
return (ENOMEM);
}
- TAILQ_INIT(&priv->work_queue);
- SLIST_INIT(&priv->free_list);
- SLIST_INIT(&priv->expire_list);
-
- mtx_init(&priv->work_mtx, "ng_netflow cache mutex", NULL, MTX_DEF);
- mtx_init(&priv->free_mtx, "ng_netflow free mutex", NULL, MTX_DEF);
- mtx_init(&priv->expire_mtx, "ng_netflow expire mutex", NULL, MTX_DEF);
-
- /* build free list */
- for (i = 0, fle = priv->cache; i < CACHESIZE; i++, fle++)
- SLIST_INSERT_HEAD(&priv->free_list, fle, fle_free);
+ /* Initialize hash. */
+ for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) {
+ mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
+ TAILQ_INIT(&hsh->head);
+ }
- priv->info.nfinfo_free = CACHESIZE;
+ mtx_init(&priv->export_mtx, "export dgram lock", NULL, MTX_DEF);
return (0);
}
@@ -355,7 +374,9 @@ ng_netflow_cache_init(priv_p priv)
void
ng_netflow_cache_flush(priv_p priv)
{
- register struct flow_entry *fle;
+ struct flow_entry *fle, *fle1;
+ struct flow_hash_entry *hsh;
+ item_p item = NULL;
int i;
/*
@@ -363,36 +384,39 @@ ng_netflow_cache_flush(priv_p priv)
* Expire everything before freeing it.
* No locking is required since callout is already drained.
*/
+ for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++)
+ TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
+ TAILQ_REMOVE(&hsh->head, fle, fle_hash);
+ expire_flow(priv, &item, fle);
+ }
- for (i = 0, fle = priv->cache; i < CACHESIZE; i++, fle++)
- if (!ISFREE(fle))
- /* ignore errors now */
- (void )export_add(priv, fle);
+ if (item != NULL)
+ export_send(priv, item);
- mtx_destroy(&priv->work_mtx);
- mtx_destroy(&priv->free_mtx);
- mtx_destroy(&priv->expire_mtx);
+ uma_zdestroy(priv->zone);
- /* free hash memory */
- if (priv->hash)
- FREE(priv->hash, M_NETFLOW);
+ /* Destroy hash mutexes. */
+ for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++)
+ mtx_destroy(&hsh->mtx);
- /* free flow cache */
- if (priv->cache)
- FREE(priv->cache, M_NETFLOW);
+ /* Free hash memory. */
+ if (priv->hash)
+ FREE(priv->hash, M_NETFLOW_HASH);
+ mtx_destroy(&priv->export_mtx);
}
-/* Insert packet from &m into flow cache. */
+/* Insert packet from into flow cache. */
int
ng_netflow_flow_add(priv_p priv, struct ip *ip, iface_p iface,
struct ifnet *ifp)
{
- struct flow_hash_entry *h = priv->hash;
- register struct flow_entry *fle;
+ register struct flow_entry *fle, *fle1;
+ struct flow_hash_entry *hsh;
struct flow_rec r;
+ item_p item = NULL;
int hlen, plen;
- uint32_t slot;
+ int error = 0;
uint8_t tcp_flags = 0;
/* Try to fill flow_rec r */
@@ -449,19 +473,32 @@ ng_netflow_flow_add(priv_p priv, struct ip *ip, iface_p iface,
break;
}
- slot = ip_hash(&r);
-
- mtx_lock(&priv->work_mtx);
-
- /* Update node statistics. */
+ /* Update node statistics. XXX: race... */
priv->info.nfinfo_packets ++;
priv->info.nfinfo_bytes += plen;
- fle = hash_lookup(h, slot, &r); /* New flow entry or existent? */
+ /* Find hash slot. */
+ hsh = &priv->hash[ip_hash(&r)];
- if (fle) { /* an existent entry */
+ mtx_lock(&hsh->mtx);
+
+ /*
+ * Go through hash and find our entry. If we encounter an
+ * entry, that should be expired, purge it. We do a reverse
+ * search since most active entries are first, and most
+ * searches are done on most active entries.
+ */
+ TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
+ if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0)
+ break;
+ if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) {
+ TAILQ_REMOVE(&hsh->head, fle, fle_hash);
+ expire_flow(priv, &item, fle);
+ atomic_add_32(&priv->info.nfinfo_act_exp, 1);
+ }
+ }
- TAILQ_REMOVE(&priv->work_queue, fle, fle_work);
+ if (fle) { /* An existent entry. */
fle->f.bytes += plen;
fle->f.packets ++;
@@ -475,135 +512,152 @@ ng_netflow_flow_add(priv_p priv, struct ip *ip, iface_p iface,
* - it is going to overflow counter
*/
if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) ||
- (fle->f.bytes >= (UINT_MAX - IF_MAXMTU)) )
- expire_flow(priv, fle);
- else
- TAILQ_INSERT_TAIL(&priv->work_queue, fle, fle_work);
-
- mtx_unlock(&priv->work_mtx);
-
- } else { /* a new flow entry */
-
- mtx_unlock(&priv->work_mtx);
- return hash_insert(priv, slot, &r, plen, tcp_flags);
+ (fle->f.bytes >= (UINT_MAX - IF_MAXMTU)) ) {
+ TAILQ_REMOVE(&hsh->head, fle, fle_hash);
+ expire_flow(priv, &item, fle);
+ } else {
+ /*
+ * It is the newest, move it to the tail,
+ * if it isn't there already. Next search will
+ * locate it quicker.
+ */
+ if (fle != TAILQ_LAST(&hsh->head, fhead)) {
+ TAILQ_REMOVE(&hsh->head, fle, fle_hash);
+ TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
+ }
+ }
+ } else /* A new flow entry. */
+ error = hash_insert(priv, hsh, &r, plen, tcp_flags);
- }
+ mtx_unlock(&hsh->mtx);
- mtx_assert(&priv->work_mtx, MA_NOTOWNED);
- mtx_assert(&priv->expire_mtx, MA_NOTOWNED);
- mtx_assert(&priv->free_mtx, MA_NOTOWNED);
+ if (item != NULL)
+ return_export_dgram(priv, item);
- return (0);
+ return (error);
}
/*
- * Return records from cache. netgraph(4) guarantees us that we
- * are locked against ng_netflow_rcvdata(). However we can
- * work with ng_netflow_expire() in parrallel. XXX: Is it dangerous?
+ * Return records from cache to userland.
*
+ * TODO: consider NGM_READONLY
* TODO: matching particular IP should be done in kernel, here.
*/
int
ng_netflow_flow_show(priv_p priv, uint32_t last, struct ng_mesg *resp)
{
+ struct flow_hash_entry *hsh;
struct flow_entry *fle;
struct ngnf_flows *data;
+ int i;
data = (struct ngnf_flows *)resp->data;
data->last = 0;
data->nentries = 0;
/* Check if this is a first run */
- if (last == 0)
- fle = priv->cache;
- else {
- if (last > CACHESIZE-1)
+ if (last == 0) {
+ hsh = priv->hash;
+ i = 0;
+ } else {
+ if (last > NBUCKETS-1)
return (EINVAL);
- fle = priv->cache + last;
+ hsh = priv->hash + last;
+ i = last;
}
/*
* We will transfer not more than NREC_AT_ONCE. More data
* will come in next message.
- * We send current stop point to userland, and userland should return
- * it back to us.
+ * We send current hash index to userland, and userland should
+ * return it back to us. Then, we will restart with new entry.
+ *
+ * The resulting cache snapshot is inaccurate for the
+ * following reasons:
+ * - we skip locked hash entries
+ * - we bail out, if someone wants our entry
+ * - we skip rest of entry, when hit NREC_AT_ONCE
*/
- for (; last < CACHESIZE; fle++, last++) {
- if (ISFREE(fle))
+ for (; i < NBUCKETS; hsh++, i++) {
+ if (mtx_trylock(&hsh->mtx) == 0)
continue;
- bcopy(&fle->f, &(data->entries[data->nentries]),
- sizeof(fle->f));
- data->nentries ++;
- if (data->nentries == NREC_AT_ONCE) {
- if (++last < CACHESIZE)
- data->last = (++fle - priv->cache);
- return (0);
+
+ TAILQ_FOREACH(fle, &hsh->head, fle_hash) {
+ if (hsh->mtx.mtx_lock & MTX_CONTESTED)
+ break;
+
+ bcopy(&fle->f, &(data->entries[data->nentries]),
+ sizeof(fle->f));
+ data->nentries++;
+ if (data->nentries == NREC_AT_ONCE) {
+ mtx_unlock(&hsh->mtx);
+ if (++i < NBUCKETS)
+ data->last = i;
+ return (0);
+ }
}
- }
+ mtx_unlock(&hsh->mtx);
+ }
return (0);
}
/* We have full datagram in privdata. Send it to export hook. */
static int
-export_send(priv_p priv)
+export_send(priv_p priv, item_p item)
{
- struct netflow_v5_header *header = &priv->dgram.header;
+ struct mbuf *m = NGI_M(item);
+ struct netflow_v5_export_dgram *dgram = mtod(m,
+ struct netflow_v5_export_dgram *);
+ struct netflow_v5_header *header = &dgram->header;
struct timespec ts;
- struct mbuf *m;
int error = 0;
- int mlen;
- header->sys_uptime = htonl(MILLIUPTIME(time_uptime));
+ /* Fill mbuf header. */
+ m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) *
+ header->count + sizeof(struct netflow_v5_header);
+ /* Fill export header. */
+ header->sys_uptime = htonl(MILLIUPTIME(time_uptime));
getnanotime(&ts);
header->unix_secs = htonl(ts.tv_sec);
header->unix_nsecs = htonl(ts.tv_nsec);
-
- /* Flow sequence contains number of first record */
- header->flow_seq = htonl(priv->flow_seq - header->count);
-
- mlen = sizeof(struct netflow_v5_header) +
- sizeof(struct netflow_v5_record) * header->count;
-
header->count = htons(header->count);
- if ((m = m_devget((caddr_t)header, mlen, 0, NULL, NULL)) == NULL) {
- log(LOG_CRIT, "ng_netflow: m_devget() failed, losing export "
- "dgram\n");
- header->count = 0;
- return(ENOBUFS);
- }
-
- header->count = 0;
+ header->flow_seq = htonl(atomic_load_acq_32(&priv->flow_seq));
- /* Giant is required in sosend() at this moment. */
- NET_LOCK_GIANT();
- NG_SEND_DATA_ONLY(error, priv->export, m);
- NET_UNLOCK_GIANT();
+ /* Flow sequence contains number of first record, so it
+ is updated after being put in header. */
+ atomic_add_32(&priv->flow_seq, header->count);
- if (error)
- NG_FREE_M(m);
+ if (priv->export != NULL)
+ /* Should also NET_LOCK_GIANT(). */
+ NG_FWD_ITEM_HOOK(error, item, priv->export);
return (error);
}
-/* Create export datagram. */
+/* Add export record to dgram. */
static int
-export_add(priv_p priv, struct flow_entry *fle)
+export_add(item_p item, struct flow_entry *fle)
{
- struct netflow_v5_header *header = &priv->dgram.header;
+ struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item),
+ struct netflow_v5_export_dgram *);
+ struct netflow_v5_header *header = &dgram->header;
struct netflow_v5_record *rec;
if (header->count == 0 ) { /* first record */
- rec = &priv->dgram.r[0];
+ rec = &dgram->r[0];
header->count = 1;
} else { /* continue filling datagram */
- rec = &priv->dgram.r[header->count];
+ rec = &dgram->r[header->count];
header->count ++;
}
- /* Fill in export record */
+ KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS,
+ ("ng_netflow: export too big"));
+
+ /* Fill in export record. */
rec->src_addr = fle->f.r.r_src.s_addr;
rec->dst_addr = fle->f.r.r_dst.s_addr;
rec->next_hop = fle->f.next_hop.s_addr;
@@ -621,97 +675,67 @@ export_add(priv_p priv, struct flow_entry *fle)
rec->dst_mask = fle->f.dst_mask;
rec->src_mask = fle->f.src_mask;
- priv->flow_seq++;
+ /* Not supported fields. */
+ rec->src_as = rec->dst_as = 0;
- if (header->count == NETFLOW_V5_MAX_RECORDS) /* end of datagram */
- return export_send(priv);
-
- return (0);
+ if (header->count == NETFLOW_V5_MAX_RECORDS)
+ return (1); /* end of datagram */
+ else
+ return (0);
}
/* Periodic flow expiry run. */
void
ng_netflow_expire(void *arg)
{
- register struct flow_entry *fle, *fle1;
- priv_p priv = (priv_p )arg;
- uint32_t used;
- int error = 0;
-
- /* First pack actively expired entries */
- mtx_lock(&priv->expire_mtx);
- while (!SLIST_EMPTY(&(priv->expire_list))) {
- fle = SLIST_FIRST(&(priv->expire_list));
- SLIST_REMOVE_HEAD(&(priv->expire_list), fle_free);
- mtx_unlock(&priv->expire_mtx);
-
- /*
- * While we have dropped the lock, expire_flow() may
- * insert another flow into top of the list.
- * This is not harmful for us, since we have already
- * detached our own.
- */
-
- if ((error = export_add(priv, fle)) != 0)
- log(LOG_CRIT, "ng_netflow: export_add() failed: %u\n",
- error);
- (void )free_flow(priv, fle);
+ struct flow_entry *fle, *fle1;
+ struct flow_hash_entry *hsh;
+ priv_p priv = (priv_p )arg;
+ item_p item = NULL;
+ uint32_t used;
+ int i;
- mtx_lock(&priv->expire_mtx);
- }
- mtx_unlock(&priv->expire_mtx);
-
- NGNF_GETUSED(priv, used);
- mtx_lock(&priv->work_mtx);
- TAILQ_FOREACH_SAFE(fle, &(priv->work_queue), fle_work, fle1) {
+ /*
+ * Going through all the cache.
+ */
+ for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) {
/*
- * When cache size has not reached CACHELOWAT yet, we keep
- * both inactive and active flows in cache. Doing this, we
- * reduce number of exports, since many inactive flows may
- * wake up and continue their life. However, we make an
- * exclusion for scans. It is very rare situation that
- * inactive 1-packet flow will wake up.
- * When cache has reached CACHELOWAT, we expire all inactive
- * flows, until cache gets to a sane size.
+ * Skip entries, that are already being worked on.
*/
- if (used <= CACHELOWAT && !INACTIVE(fle))
- goto finish;
-
- if ((INACTIVE(fle) && (SMALL(fle) || (used > CACHELOWAT))) ||
- AGED(fle)) {
-
- /* Detach flow entry from cache */
- LIST_REMOVE(fle, fle_hash);
- TAILQ_REMOVE(&priv->work_queue, fle, fle_work);
+ if (mtx_trylock(&hsh->mtx) == 0)
+ continue;
+ used = atomic_load_acq_32(&priv->info.nfinfo_used);
+ TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
/*
- * While we are sending to collector, unlock cache.
- * XXX: it can happen, however with a small probability,
- * that item, we are holding now, can be moved to the
- * top of flow cache by node thread. In this case our
- * expire thread stops checking. Since this is not
- * fatal we will just ignore it now.
+ * Interrupt thread wants this entry!
+ * Quick! Quick! Bail out!
*/
- mtx_unlock(&priv->work_mtx);
-
- if ((error = export_add(priv, fle)) != 0)
- log(LOG_CRIT, "ng_netflow: export_add() "
- "failed: %u\n", error);
-
- used = free_flow(priv, fle);
+ if (hsh->mtx.mtx_lock & MTX_CONTESTED)
+ break;
- mtx_lock(&priv->work_mtx);
+ /*
+ * Don't expire aggressively while hash collision
+ * ratio is predicted small.
+ */
+ if (used <= (NBUCKETS*2) && !INACTIVE(fle))
+ break;
+
+ if ((INACTIVE(fle) && (SMALL(fle) || (used > (NBUCKETS*2)))) ||
+ AGED(fle)) {
+ TAILQ_REMOVE(&hsh->head, fle, fle_hash);
+ expire_flow(priv, &item, fle);
+ used--;
+ atomic_add_32(&priv->info.nfinfo_inact_exp, 1);
+ }
}
- }
-
-finish:
- mtx_unlock(&priv->work_mtx);
+ mtx_unlock(&hsh->mtx);
+ }
- mtx_assert(&priv->expire_mtx, MA_NOTOWNED);
- mtx_assert(&priv->free_mtx, MA_NOTOWNED);
+ if (item != NULL)
+ return_export_dgram(priv, item);
- /* schedule next expire */
+ /* Schedule next expire. */
callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire,
(void *)priv);
-
}
OpenPOWER on IntegriCloud