diff options
author | glebius <glebius@FreeBSD.org> | 2005-05-11 11:26:24 +0000 |
---|---|---|
committer | glebius <glebius@FreeBSD.org> | 2005-05-11 11:26:24 +0000 |
commit | 8455545285d4f5cb42b42bade7aceafc22d1cd59 (patch) | |
tree | a20a65ec59042108874f8178d54da871a4d1f80b /sys/netgraph/netflow/netflow.c | |
parent | 6413eea55ab7218f882f99d922684166d739e1f4 (diff) | |
download | FreeBSD-src-8455545285d4f5cb42b42bade7aceafc22d1cd59.zip FreeBSD-src-8455545285d4f5cb42b42bade7aceafc22d1cd59.tar.gz |
A new version of NetFlow node.
The most significant changes are:
- Use UMA zone instead of own chunk of memory.
- Lock each hash entry separately.
- Expire items "actively" - interrupt method can expire flows
from hash slot, when it searches through it.
- Remove global tailqueue. Make callout thread search through
every hash slot.
- Export datagram is detached from private data and filled. If
it is incomplete, it is attached back. Another thread will
continue working with it.
Lesser, but also important speedups:
- Flows in hash slot are stored in tailqueue. Whenever a flow is
hit, it is moved to the begging, so it can be located quicker.
- When callout thread works with hash slot it bails out if
slot mutex is contested.
Diffstat (limited to 'sys/netgraph/netflow/netflow.c')
-rw-r--r-- | sys/netgraph/netflow/netflow.c | 562 |
1 files changed, 293 insertions, 269 deletions
diff --git a/sys/netgraph/netflow/netflow.c b/sys/netgraph/netflow/netflow.c index 296849d4..4ed9e19 100644 --- a/sys/netgraph/netflow/netflow.c +++ b/sys/netgraph/netflow/netflow.c @@ -38,9 +38,9 @@ static const char rcs_id[] = #include <sys/systm.h> #include <sys/socket.h> +#include <machine/atomic.h> + #include <net/if.h> -#include <net/if_var.h> -#include <net/if_dl.h> #include <net/route.h> #include <netinet/in.h> #include <netinet/in_systm.h> @@ -54,7 +54,7 @@ static const char rcs_id[] = #include <netgraph/netflow/netflow.h> #include <netgraph/netflow/ng_netflow.h> -#define NBUCKETS (4096) /* must be power of 2 */ +#define NBUCKETS (65536) /* must be power of 2 */ /* This hash is for TCP or UDP packets */ #define FULL_HASH(addr1,addr2,port1,port2)\ @@ -63,7 +63,7 @@ static const char rcs_id[] = ((port1 ^ port2) << 8) )& \ (NBUCKETS - 1)) -/* This hash for all other IP packets */ +/* This hash is for all other IP packets */ #define ADDR_HASH(addr1,addr2)\ (((addr1 >> 16) ^ \ (addr2 & 0x00FF) )& \ @@ -95,13 +95,13 @@ static const char rcs_id[] = ((t) << 5) + /* 32 */ \ ((t) << 3)) /* 8 */ -MALLOC_DECLARE(M_NETFLOW); -MALLOC_DEFINE(M_NETFLOW, "NetFlow", "flow cache"); +MALLOC_DECLARE(M_NETFLOW_HASH); +MALLOC_DEFINE(M_NETFLOW_HASH, "NetFlow hash", "NetFlow hash"); -static int export_add(priv_p , struct flow_entry *); -static int export_send(priv_p ); +static int export_add(item_p, struct flow_entry *); +static int export_send(priv_p, item_p); -/* Generate hash for a given flow record */ +/* Generate hash for a given flow record. */ static __inline uint32_t ip_hash(struct flow_rec *r) { @@ -115,88 +115,113 @@ ip_hash(struct flow_rec *r) } } -/* Lookup for record in given slot */ -static __inline struct flow_entry * -hash_lookup(struct flow_hash_entry *h, int slot, struct flow_rec *r) +/* This is callback from uma(9), called on alloc. */ +static int +uma_ctor_flow(void *mem, int size, void *arg, int how) { - struct flow_entry *fle; + priv_p priv = (priv_p )arg; + + if (atomic_load_acq_32(&priv->info.nfinfo_used) >= CACHESIZE) + return (ENOMEM); - LIST_FOREACH(fle, &(h[slot].head), fle_hash) - if (bcmp(r, &fle->f.r, sizeof(struct flow_rec)) == 0) - return (fle); + atomic_add_32(&priv->info.nfinfo_used, 1); - return (NULL); + return (0); } -/* Get a flow entry from free list */ -static __inline struct flow_entry * -alloc_flow(priv_p priv, int *flows) +/* This is callback from uma(9), called on free. */ +static void +uma_dtor_flow(void *mem, int size, void *arg) { - register struct flow_entry *fle; - - mtx_lock(&priv->free_mtx); - - if (SLIST_EMPTY(&priv->free_list)) { - mtx_unlock(&priv->free_mtx); - return(NULL); - } + priv_p priv = (priv_p )arg; - fle = SLIST_FIRST(&priv->free_list); - SLIST_REMOVE_HEAD(&priv->free_list, fle_free); + atomic_subtract_32(&priv->info.nfinfo_used, 1); +} - priv->info.nfinfo_used++; - priv->info.nfinfo_free--; +/* + * Detach export datagram from priv, if there is any. + * If there is no, allocate a new one. + */ +static item_p +get_export_dgram(priv_p priv) +{ + item_p item = NULL; - if (flows != NULL) - *flows = priv->info.nfinfo_used; + mtx_lock(&priv->export_mtx); + if (priv->export_item != NULL) { + item = priv->export_item; + priv->export_item = NULL; + } + mtx_unlock(&priv->export_mtx); + + if (item == NULL) { + struct netflow_v5_export_dgram *dgram; + struct mbuf *m; + + m = m_getcl(M_DONTWAIT, MT_DATA, M_PKTHDR); + if (m == NULL) + return (NULL); + item = ng_package_data(m, NULL); + if (item == NULL) + return (NULL); + dgram = mtod(m, struct netflow_v5_export_dgram *); + dgram->header.count = 0; + dgram->header.version = htons(NETFLOW_V5); - mtx_unlock(&priv->free_mtx); + } - return (fle); + return (item); } -/* Insert flow entry into a free list. */ -static __inline int -free_flow(priv_p priv, struct flow_entry *fle) +/* + * Re-attach incomplete datagram back to priv. + * If there is already another one, then send incomplete. */ +static void +return_export_dgram(priv_p priv, item_p item) { - int flows; - - mtx_lock(&priv->free_mtx); - fle->f.packets = 0; - SLIST_INSERT_HEAD(&priv->free_list, fle, fle_free); - flows = priv->info.nfinfo_used--; - priv->info.nfinfo_free++; - mtx_unlock(&priv->free_mtx); - - return flows; + /* + * It may happen on SMP, that some thread has already + * put its item there, in this case we bail out and + * send what we have to collector. + */ + mtx_lock(&priv->export_mtx); + if (priv->export_item == NULL) { + priv->export_item = item; + mtx_unlock(&priv->export_mtx); + } else { + mtx_unlock(&priv->export_mtx); + export_send(priv, item); + } } -#define NGNF_GETUSED(priv, rval) do { \ - mtx_lock(&priv->free_mtx); \ - rval = priv->info.nfinfo_used; \ - mtx_unlock(&priv->free_mtx); \ - } while (0) - -/* Insert flow entry into expire list. */ -/* XXX: Flow must be detached from work queue, but not from cache */ +/* + * The flow is over. Call export_add() and free it. If datagram is + * full, then call export_send(). + */ static __inline void -expire_flow(priv_p priv, struct flow_entry *fle) +expire_flow(priv_p priv, item_p *item, struct flow_entry *fle) { - mtx_assert(&priv->work_mtx, MA_OWNED); - LIST_REMOVE(fle, fle_hash); - - mtx_lock(&priv->expire_mtx); - SLIST_INSERT_HEAD(&priv->expire_list, fle, fle_free); - mtx_unlock(&priv->expire_mtx); + if (*item == NULL) + *item = get_export_dgram(priv); + if (*item == NULL) { + /* XXX: do stats! */ + log(LOG_DEBUG, "get_export_dgram failed\n"); + uma_zfree_arg(priv->zone, fle, priv); + return; + } + if (export_add(*item, fle) > 0) { + export_send(priv, *item); + *item = NULL; + } + uma_zfree_arg(priv->zone, fle, priv); } /* Get a snapshot of node statistics */ void ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i) { - mtx_lock(&priv->free_mtx); + /* XXX: atomic */ memcpy((void *)i, (void *)&priv->info, sizeof(priv->info)); - mtx_unlock(&priv->free_mtx); } /* Calculate number of bits in netmask */ @@ -216,21 +241,27 @@ bit_count(uint32_t v) * Insert a record into defined slot. * * First we get for us a free flow entry, then fill in all - * possible fields in it. Then obtain lock on flow cache - * and insert flow entry. + * possible fields in it. + * + * TODO: consider dropping hash mutex while filling in datagram, + * as this was done in previous version. Need to test & profile + * to be sure. */ static __inline int -hash_insert(priv_p priv, int slot, struct flow_rec *r, int plen, - uint8_t tcp_flags) +hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r, + int plen, uint8_t tcp_flags) { - struct flow_hash_entry *h = priv->hash; struct flow_entry *fle; struct route ro; struct sockaddr_in *sin; - fle = alloc_flow(priv, NULL); - if (fle == NULL) + mtx_assert(&hsh->mtx, MA_OWNED); + + fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT); + if (fle == NULL) { + atomic_add_32(&priv->info.nfinfo_failed, 1); return (ENOMEM); + } /* * Now fle is totally ours. It is detached from all lists, @@ -295,11 +326,8 @@ hash_insert(priv_p priv, int slot, struct flow_rec *r, int plen, RTFREE(ro.ro_rt); } - /* Push new flow entry into flow cache */ - mtx_lock(&priv->work_mtx); - LIST_INSERT_HEAD(&(h[slot].head), fle, fle_hash); - TAILQ_INSERT_TAIL(&priv->work_queue, fle, fle_work); - mtx_unlock(&priv->work_mtx); + /* Push new flow at the and of hash. */ + TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); return (0); } @@ -313,40 +341,31 @@ hash_insert(priv_p priv, int slot, struct flow_rec *r, int plen, int ng_netflow_cache_init(priv_p priv) { - struct flow_entry *fle; + struct flow_hash_entry *hsh; int i; - /* allocate cache */ - MALLOC(priv->cache, struct flow_entry *, - CACHESIZE * sizeof(struct flow_entry), - M_NETFLOW, M_WAITOK | M_ZERO); - - if (priv->cache == NULL) - return (ENOMEM); + /* Initialize cache UMA zone. */ + priv->zone = uma_zcreate("NetFlow cache", sizeof(struct flow_entry), + uma_ctor_flow, uma_dtor_flow, NULL, NULL, UMA_ALIGN_CACHE, 0); + uma_zone_set_max(priv->zone, CACHESIZE); - /* allocate hash */ + /* Allocate hash. */ MALLOC(priv->hash, struct flow_hash_entry *, NBUCKETS * sizeof(struct flow_hash_entry), - M_NETFLOW, M_WAITOK | M_ZERO); + M_NETFLOW_HASH, M_WAITOK | M_ZERO); if (priv->hash == NULL) { - FREE(priv->cache, M_NETFLOW); + uma_zdestroy(priv->zone); return (ENOMEM); } - TAILQ_INIT(&priv->work_queue); - SLIST_INIT(&priv->free_list); - SLIST_INIT(&priv->expire_list); - - mtx_init(&priv->work_mtx, "ng_netflow cache mutex", NULL, MTX_DEF); - mtx_init(&priv->free_mtx, "ng_netflow free mutex", NULL, MTX_DEF); - mtx_init(&priv->expire_mtx, "ng_netflow expire mutex", NULL, MTX_DEF); - - /* build free list */ - for (i = 0, fle = priv->cache; i < CACHESIZE; i++, fle++) - SLIST_INSERT_HEAD(&priv->free_list, fle, fle_free); + /* Initialize hash. */ + for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) { + mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF); + TAILQ_INIT(&hsh->head); + } - priv->info.nfinfo_free = CACHESIZE; + mtx_init(&priv->export_mtx, "export dgram lock", NULL, MTX_DEF); return (0); } @@ -355,7 +374,9 @@ ng_netflow_cache_init(priv_p priv) void ng_netflow_cache_flush(priv_p priv) { - register struct flow_entry *fle; + struct flow_entry *fle, *fle1; + struct flow_hash_entry *hsh; + item_p item = NULL; int i; /* @@ -363,36 +384,39 @@ ng_netflow_cache_flush(priv_p priv) * Expire everything before freeing it. * No locking is required since callout is already drained. */ + for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) + TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { + TAILQ_REMOVE(&hsh->head, fle, fle_hash); + expire_flow(priv, &item, fle); + } - for (i = 0, fle = priv->cache; i < CACHESIZE; i++, fle++) - if (!ISFREE(fle)) - /* ignore errors now */ - (void )export_add(priv, fle); + if (item != NULL) + export_send(priv, item); - mtx_destroy(&priv->work_mtx); - mtx_destroy(&priv->free_mtx); - mtx_destroy(&priv->expire_mtx); + uma_zdestroy(priv->zone); - /* free hash memory */ - if (priv->hash) - FREE(priv->hash, M_NETFLOW); + /* Destroy hash mutexes. */ + for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) + mtx_destroy(&hsh->mtx); - /* free flow cache */ - if (priv->cache) - FREE(priv->cache, M_NETFLOW); + /* Free hash memory. */ + if (priv->hash) + FREE(priv->hash, M_NETFLOW_HASH); + mtx_destroy(&priv->export_mtx); } -/* Insert packet from &m into flow cache. */ +/* Insert packet from into flow cache. */ int ng_netflow_flow_add(priv_p priv, struct ip *ip, iface_p iface, struct ifnet *ifp) { - struct flow_hash_entry *h = priv->hash; - register struct flow_entry *fle; + register struct flow_entry *fle, *fle1; + struct flow_hash_entry *hsh; struct flow_rec r; + item_p item = NULL; int hlen, plen; - uint32_t slot; + int error = 0; uint8_t tcp_flags = 0; /* Try to fill flow_rec r */ @@ -449,19 +473,32 @@ ng_netflow_flow_add(priv_p priv, struct ip *ip, iface_p iface, break; } - slot = ip_hash(&r); - - mtx_lock(&priv->work_mtx); - - /* Update node statistics. */ + /* Update node statistics. XXX: race... */ priv->info.nfinfo_packets ++; priv->info.nfinfo_bytes += plen; - fle = hash_lookup(h, slot, &r); /* New flow entry or existent? */ + /* Find hash slot. */ + hsh = &priv->hash[ip_hash(&r)]; - if (fle) { /* an existent entry */ + mtx_lock(&hsh->mtx); + + /* + * Go through hash and find our entry. If we encounter an + * entry, that should be expired, purge it. We do a reverse + * search since most active entries are first, and most + * searches are done on most active entries. + */ + TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) { + if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0) + break; + if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) { + TAILQ_REMOVE(&hsh->head, fle, fle_hash); + expire_flow(priv, &item, fle); + atomic_add_32(&priv->info.nfinfo_act_exp, 1); + } + } - TAILQ_REMOVE(&priv->work_queue, fle, fle_work); + if (fle) { /* An existent entry. */ fle->f.bytes += plen; fle->f.packets ++; @@ -475,135 +512,152 @@ ng_netflow_flow_add(priv_p priv, struct ip *ip, iface_p iface, * - it is going to overflow counter */ if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) || - (fle->f.bytes >= (UINT_MAX - IF_MAXMTU)) ) - expire_flow(priv, fle); - else - TAILQ_INSERT_TAIL(&priv->work_queue, fle, fle_work); - - mtx_unlock(&priv->work_mtx); - - } else { /* a new flow entry */ - - mtx_unlock(&priv->work_mtx); - return hash_insert(priv, slot, &r, plen, tcp_flags); + (fle->f.bytes >= (UINT_MAX - IF_MAXMTU)) ) { + TAILQ_REMOVE(&hsh->head, fle, fle_hash); + expire_flow(priv, &item, fle); + } else { + /* + * It is the newest, move it to the tail, + * if it isn't there already. Next search will + * locate it quicker. + */ + if (fle != TAILQ_LAST(&hsh->head, fhead)) { + TAILQ_REMOVE(&hsh->head, fle, fle_hash); + TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash); + } + } + } else /* A new flow entry. */ + error = hash_insert(priv, hsh, &r, plen, tcp_flags); - } + mtx_unlock(&hsh->mtx); - mtx_assert(&priv->work_mtx, MA_NOTOWNED); - mtx_assert(&priv->expire_mtx, MA_NOTOWNED); - mtx_assert(&priv->free_mtx, MA_NOTOWNED); + if (item != NULL) + return_export_dgram(priv, item); - return (0); + return (error); } /* - * Return records from cache. netgraph(4) guarantees us that we - * are locked against ng_netflow_rcvdata(). However we can - * work with ng_netflow_expire() in parrallel. XXX: Is it dangerous? + * Return records from cache to userland. * + * TODO: consider NGM_READONLY * TODO: matching particular IP should be done in kernel, here. */ int ng_netflow_flow_show(priv_p priv, uint32_t last, struct ng_mesg *resp) { + struct flow_hash_entry *hsh; struct flow_entry *fle; struct ngnf_flows *data; + int i; data = (struct ngnf_flows *)resp->data; data->last = 0; data->nentries = 0; /* Check if this is a first run */ - if (last == 0) - fle = priv->cache; - else { - if (last > CACHESIZE-1) + if (last == 0) { + hsh = priv->hash; + i = 0; + } else { + if (last > NBUCKETS-1) return (EINVAL); - fle = priv->cache + last; + hsh = priv->hash + last; + i = last; } /* * We will transfer not more than NREC_AT_ONCE. More data * will come in next message. - * We send current stop point to userland, and userland should return - * it back to us. + * We send current hash index to userland, and userland should + * return it back to us. Then, we will restart with new entry. + * + * The resulting cache snapshot is inaccurate for the + * following reasons: + * - we skip locked hash entries + * - we bail out, if someone wants our entry + * - we skip rest of entry, when hit NREC_AT_ONCE */ - for (; last < CACHESIZE; fle++, last++) { - if (ISFREE(fle)) + for (; i < NBUCKETS; hsh++, i++) { + if (mtx_trylock(&hsh->mtx) == 0) continue; - bcopy(&fle->f, &(data->entries[data->nentries]), - sizeof(fle->f)); - data->nentries ++; - if (data->nentries == NREC_AT_ONCE) { - if (++last < CACHESIZE) - data->last = (++fle - priv->cache); - return (0); + + TAILQ_FOREACH(fle, &hsh->head, fle_hash) { + if (hsh->mtx.mtx_lock & MTX_CONTESTED) + break; + + bcopy(&fle->f, &(data->entries[data->nentries]), + sizeof(fle->f)); + data->nentries++; + if (data->nentries == NREC_AT_ONCE) { + mtx_unlock(&hsh->mtx); + if (++i < NBUCKETS) + data->last = i; + return (0); + } } - } + mtx_unlock(&hsh->mtx); + } return (0); } /* We have full datagram in privdata. Send it to export hook. */ static int -export_send(priv_p priv) +export_send(priv_p priv, item_p item) { - struct netflow_v5_header *header = &priv->dgram.header; + struct mbuf *m = NGI_M(item); + struct netflow_v5_export_dgram *dgram = mtod(m, + struct netflow_v5_export_dgram *); + struct netflow_v5_header *header = &dgram->header; struct timespec ts; - struct mbuf *m; int error = 0; - int mlen; - header->sys_uptime = htonl(MILLIUPTIME(time_uptime)); + /* Fill mbuf header. */ + m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) * + header->count + sizeof(struct netflow_v5_header); + /* Fill export header. */ + header->sys_uptime = htonl(MILLIUPTIME(time_uptime)); getnanotime(&ts); header->unix_secs = htonl(ts.tv_sec); header->unix_nsecs = htonl(ts.tv_nsec); - - /* Flow sequence contains number of first record */ - header->flow_seq = htonl(priv->flow_seq - header->count); - - mlen = sizeof(struct netflow_v5_header) + - sizeof(struct netflow_v5_record) * header->count; - header->count = htons(header->count); - if ((m = m_devget((caddr_t)header, mlen, 0, NULL, NULL)) == NULL) { - log(LOG_CRIT, "ng_netflow: m_devget() failed, losing export " - "dgram\n"); - header->count = 0; - return(ENOBUFS); - } - - header->count = 0; + header->flow_seq = htonl(atomic_load_acq_32(&priv->flow_seq)); - /* Giant is required in sosend() at this moment. */ - NET_LOCK_GIANT(); - NG_SEND_DATA_ONLY(error, priv->export, m); - NET_UNLOCK_GIANT(); + /* Flow sequence contains number of first record, so it + is updated after being put in header. */ + atomic_add_32(&priv->flow_seq, header->count); - if (error) - NG_FREE_M(m); + if (priv->export != NULL) + /* Should also NET_LOCK_GIANT(). */ + NG_FWD_ITEM_HOOK(error, item, priv->export); return (error); } -/* Create export datagram. */ +/* Add export record to dgram. */ static int -export_add(priv_p priv, struct flow_entry *fle) +export_add(item_p item, struct flow_entry *fle) { - struct netflow_v5_header *header = &priv->dgram.header; + struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item), + struct netflow_v5_export_dgram *); + struct netflow_v5_header *header = &dgram->header; struct netflow_v5_record *rec; if (header->count == 0 ) { /* first record */ - rec = &priv->dgram.r[0]; + rec = &dgram->r[0]; header->count = 1; } else { /* continue filling datagram */ - rec = &priv->dgram.r[header->count]; + rec = &dgram->r[header->count]; header->count ++; } - /* Fill in export record */ + KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS, + ("ng_netflow: export too big")); + + /* Fill in export record. */ rec->src_addr = fle->f.r.r_src.s_addr; rec->dst_addr = fle->f.r.r_dst.s_addr; rec->next_hop = fle->f.next_hop.s_addr; @@ -621,97 +675,67 @@ export_add(priv_p priv, struct flow_entry *fle) rec->dst_mask = fle->f.dst_mask; rec->src_mask = fle->f.src_mask; - priv->flow_seq++; + /* Not supported fields. */ + rec->src_as = rec->dst_as = 0; - if (header->count == NETFLOW_V5_MAX_RECORDS) /* end of datagram */ - return export_send(priv); - - return (0); + if (header->count == NETFLOW_V5_MAX_RECORDS) + return (1); /* end of datagram */ + else + return (0); } /* Periodic flow expiry run. */ void ng_netflow_expire(void *arg) { - register struct flow_entry *fle, *fle1; - priv_p priv = (priv_p )arg; - uint32_t used; - int error = 0; - - /* First pack actively expired entries */ - mtx_lock(&priv->expire_mtx); - while (!SLIST_EMPTY(&(priv->expire_list))) { - fle = SLIST_FIRST(&(priv->expire_list)); - SLIST_REMOVE_HEAD(&(priv->expire_list), fle_free); - mtx_unlock(&priv->expire_mtx); - - /* - * While we have dropped the lock, expire_flow() may - * insert another flow into top of the list. - * This is not harmful for us, since we have already - * detached our own. - */ - - if ((error = export_add(priv, fle)) != 0) - log(LOG_CRIT, "ng_netflow: export_add() failed: %u\n", - error); - (void )free_flow(priv, fle); + struct flow_entry *fle, *fle1; + struct flow_hash_entry *hsh; + priv_p priv = (priv_p )arg; + item_p item = NULL; + uint32_t used; + int i; - mtx_lock(&priv->expire_mtx); - } - mtx_unlock(&priv->expire_mtx); - - NGNF_GETUSED(priv, used); - mtx_lock(&priv->work_mtx); - TAILQ_FOREACH_SAFE(fle, &(priv->work_queue), fle_work, fle1) { + /* + * Going through all the cache. + */ + for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) { /* - * When cache size has not reached CACHELOWAT yet, we keep - * both inactive and active flows in cache. Doing this, we - * reduce number of exports, since many inactive flows may - * wake up and continue their life. However, we make an - * exclusion for scans. It is very rare situation that - * inactive 1-packet flow will wake up. - * When cache has reached CACHELOWAT, we expire all inactive - * flows, until cache gets to a sane size. + * Skip entries, that are already being worked on. */ - if (used <= CACHELOWAT && !INACTIVE(fle)) - goto finish; - - if ((INACTIVE(fle) && (SMALL(fle) || (used > CACHELOWAT))) || - AGED(fle)) { - - /* Detach flow entry from cache */ - LIST_REMOVE(fle, fle_hash); - TAILQ_REMOVE(&priv->work_queue, fle, fle_work); + if (mtx_trylock(&hsh->mtx) == 0) + continue; + used = atomic_load_acq_32(&priv->info.nfinfo_used); + TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) { /* - * While we are sending to collector, unlock cache. - * XXX: it can happen, however with a small probability, - * that item, we are holding now, can be moved to the - * top of flow cache by node thread. In this case our - * expire thread stops checking. Since this is not - * fatal we will just ignore it now. + * Interrupt thread wants this entry! + * Quick! Quick! Bail out! */ - mtx_unlock(&priv->work_mtx); - - if ((error = export_add(priv, fle)) != 0) - log(LOG_CRIT, "ng_netflow: export_add() " - "failed: %u\n", error); - - used = free_flow(priv, fle); + if (hsh->mtx.mtx_lock & MTX_CONTESTED) + break; - mtx_lock(&priv->work_mtx); + /* + * Don't expire aggressively while hash collision + * ratio is predicted small. + */ + if (used <= (NBUCKETS*2) && !INACTIVE(fle)) + break; + + if ((INACTIVE(fle) && (SMALL(fle) || (used > (NBUCKETS*2)))) || + AGED(fle)) { + TAILQ_REMOVE(&hsh->head, fle, fle_hash); + expire_flow(priv, &item, fle); + used--; + atomic_add_32(&priv->info.nfinfo_inact_exp, 1); + } } - } - -finish: - mtx_unlock(&priv->work_mtx); + mtx_unlock(&hsh->mtx); + } - mtx_assert(&priv->expire_mtx, MA_NOTOWNED); - mtx_assert(&priv->free_mtx, MA_NOTOWNED); + if (item != NULL) + return_export_dgram(priv, item); - /* schedule next expire */ + /* Schedule next expire. */ callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire, (void *)priv); - } |