diff options
Diffstat (limited to 'drivers/staging/lustre/lustre/obdclass/llog.c')
-rw-r--r-- | drivers/staging/lustre/lustre/obdclass/llog.c | 139 |
1 files changed, 95 insertions, 44 deletions
diff --git a/drivers/staging/lustre/lustre/obdclass/llog.c b/drivers/staging/lustre/lustre/obdclass/llog.c index 43797f1..736ea10 100644 --- a/drivers/staging/lustre/lustre/obdclass/llog.c +++ b/drivers/staging/lustre/lustre/obdclass/llog.c @@ -43,8 +43,9 @@ #define DEBUG_SUBSYSTEM S_LOG -#include "../include/obd_class.h" +#include "../include/llog_swab.h" #include "../include/lustre_log.h" +#include "../include/obd_class.h" #include "llog_internal.h" /* @@ -80,8 +81,7 @@ static void llog_free_handle(struct llog_handle *loghandle) LASSERT(list_empty(&loghandle->u.phd.phd_entry)); else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) LASSERT(list_empty(&loghandle->u.chd.chd_head)); - LASSERT(sizeof(*loghandle->lgh_hdr) == LLOG_CHUNK_SIZE); - kfree(loghandle->lgh_hdr); + kvfree(loghandle->lgh_hdr); out: kfree(loghandle); } @@ -115,20 +115,29 @@ static int llog_read_header(const struct lu_env *env, rc = lop->lop_read_header(env, handle); if (rc == LLOG_EEMPTY) { struct llog_log_hdr *llh = handle->lgh_hdr; + size_t len; + /* lrh_len should be initialized in llog_init_handle */ handle->lgh_last_idx = 0; /* header is record with index 0 */ llh->llh_count = 1; /* for the header record */ llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC; - llh->llh_hdr.lrh_len = LLOG_CHUNK_SIZE; - llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE; + LASSERT(handle->lgh_ctxt->loc_chunk_size >= LLOG_MIN_CHUNK_SIZE); + llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size; llh->llh_hdr.lrh_index = 0; - llh->llh_tail.lrt_index = 0; llh->llh_timestamp = ktime_get_real_seconds(); if (uuid) memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid)); llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap); - ext2_set_bit(0, llh->llh_bitmap); + /* + * Since update llog header might also call this function, + * let's reset the bitmap to 0 here + */ + len = llh->llh_hdr.lrh_len - llh->llh_bitmap_offset; + memset(LLOG_HDR_BITMAP(llh), 0, len - sizeof(llh->llh_tail)); + ext2_set_bit(0, LLOG_HDR_BITMAP(llh)); + LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len; + LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index; rc = 0; } return rc; @@ -137,16 +146,19 @@ static int llog_read_header(const struct lu_env *env, int llog_init_handle(const struct lu_env *env, struct llog_handle *handle, int flags, struct obd_uuid *uuid) { + int chunk_size = handle->lgh_ctxt->loc_chunk_size; enum llog_flag fmt = flags & LLOG_F_EXT_MASK; struct llog_log_hdr *llh; int rc; LASSERT(!handle->lgh_hdr); - llh = kzalloc(sizeof(*llh), GFP_NOFS); + LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE); + llh = libcfs_kvzalloc(sizeof(*llh), GFP_NOFS); if (!llh) return -ENOMEM; handle->lgh_hdr = llh; + handle->lgh_hdr_size = chunk_size; /* first assign flags to use llog_client_ops */ llh->llh_flags = flags; rc = llog_read_header(env, handle, uuid); @@ -189,6 +201,7 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle, LASSERT(list_empty(&handle->u.chd.chd_head)); INIT_LIST_HEAD(&handle->u.chd.chd_head); llh->llh_size = sizeof(struct llog_logid_rec); + llh->llh_flags |= LLOG_F_IS_FIXSIZE; } else if (!(flags & LLOG_F_IS_PLAIN)) { CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n", handle->lgh_ctxt->loc_obd->obd_name, @@ -198,7 +211,7 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle, llh->llh_flags |= fmt; out: if (rc) { - kfree(llh); + kvfree(llh); handle->lgh_hdr = NULL; } return rc; @@ -212,15 +225,21 @@ static int llog_process_thread(void *arg) struct llog_log_hdr *llh = loghandle->lgh_hdr; struct llog_process_cat_data *cd = lpi->lpi_catdata; char *buf; - __u64 cur_offset = LLOG_CHUNK_SIZE; - __u64 last_offset; + u64 cur_offset, tmp_offset; + int chunk_size; int rc = 0, index = 1, last_index; int saved_index = 0; int last_called_index = 0; - LASSERT(llh); + if (!llh) + return -EINVAL; + + cur_offset = llh->llh_hdr.lrh_len; + chunk_size = llh->llh_hdr.lrh_len; + /* expect chunk_size to be power of two */ + LASSERT(is_power_of_2(chunk_size)); - buf = kzalloc(LLOG_CHUNK_SIZE, GFP_NOFS); + buf = libcfs_kvzalloc(chunk_size, GFP_NOFS); if (!buf) { lpi->lpi_rc = -ENOMEM; return 0; @@ -233,41 +252,53 @@ static int llog_process_thread(void *arg) if (cd && cd->lpcd_last_idx) last_index = cd->lpcd_last_idx; else - last_index = LLOG_BITMAP_BYTES * 8 - 1; - - /* Record is not in this buffer. */ - if (index > last_index) - goto out; + last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1; while (rc == 0) { + unsigned int buf_offset = 0; struct llog_rec_hdr *rec; + bool partial_chunk; + off_t chunk_offset; /* skip records not set in bitmap */ while (index <= last_index && - !ext2_test_bit(index, llh->llh_bitmap)) + !ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) ++index; - LASSERT(index <= last_index + 1); - if (index == last_index + 1) + if (index > last_index) break; -repeat: + CDEBUG(D_OTHER, "index: %d last_index %d\n", index, last_index); - +repeat: /* get the buf with our target record; avoid old garbage */ - memset(buf, 0, LLOG_CHUNK_SIZE); - last_offset = cur_offset; + memset(buf, 0, chunk_size); rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index, - index, &cur_offset, buf, LLOG_CHUNK_SIZE); + index, &cur_offset, buf, chunk_size); if (rc) goto out; + /* + * NB: after llog_next_block() call the cur_offset is the + * offset of the next block after read one. + * The absolute offset of the current chunk is calculated + * from cur_offset value and stored in chunk_offset variable. + */ + tmp_offset = cur_offset; + if (do_div(tmp_offset, chunk_size)) { + partial_chunk = true; + chunk_offset = cur_offset & ~(chunk_size - 1); + } else { + partial_chunk = false; + chunk_offset = cur_offset - chunk_size; + } + /* NB: when rec->lrh_len is accessed it is already swabbed * since it is used at the "end" of the loop and the rec * swabbing is done at the beginning of the loop. */ - for (rec = (struct llog_rec_hdr *)buf; - (char *)rec < buf + LLOG_CHUNK_SIZE; + for (rec = (struct llog_rec_hdr *)(buf + buf_offset); + (char *)rec < buf + chunk_size; rec = llog_rec_hdr_next(rec)) { CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n", rec, rec->lrh_type); @@ -278,15 +309,29 @@ repeat: CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n", rec->lrh_type, rec->lrh_index); - if (rec->lrh_index == 0) { - /* probably another rec just got added? */ - rc = 0; - if (index <= loghandle->lgh_last_idx) - goto repeat; - goto out; /* no more records */ + /* + * for partial chunk the end of it is zeroed, check + * for index 0 to distinguish it. + */ + if (partial_chunk && !rec->lrh_index) { + /* concurrent llog_add() might add new records + * while llog_processing, check this is not + * the case and re-read the current chunk + * otherwise. + */ + if (index > loghandle->lgh_last_idx) { + rc = 0; + goto out; + } + CDEBUG(D_OTHER, "Re-read last llog buffer for new records, index %u, last %u\n", + index, loghandle->lgh_last_idx); + /* save offset inside buffer for the re-read */ + buf_offset = (char *)rec - (char *)buf; + cur_offset = chunk_offset; + goto repeat; } - if (rec->lrh_len == 0 || - rec->lrh_len > LLOG_CHUNK_SIZE) { + + if (!rec->lrh_len || rec->lrh_len > chunk_size) { CWARN("invalid length %d in llog record for index %d/%d\n", rec->lrh_len, rec->lrh_index, index); @@ -300,32 +345,38 @@ repeat: continue; } + if (rec->lrh_index != index) { + CERROR("%s: Invalid record: index %u but expected %u\n", + loghandle->lgh_ctxt->loc_obd->obd_name, + rec->lrh_index, index); + rc = -ERANGE; + goto out; + } + CDEBUG(D_OTHER, "lrh_index: %d lrh_len: %d (%d remains)\n", rec->lrh_index, rec->lrh_len, - (int)(buf + LLOG_CHUNK_SIZE - (char *)rec)); + (int)(buf + chunk_size - (char *)rec)); loghandle->lgh_cur_idx = rec->lrh_index; loghandle->lgh_cur_offset = (char *)rec - (char *)buf + - last_offset; + chunk_offset; /* if set, process the callback on this record */ - if (ext2_test_bit(index, llh->llh_bitmap)) { + if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) { rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec, lpi->lpi_cbdata); last_called_index = index; if (rc) goto out; - } else { - CDEBUG(D_OTHER, "Skipped index %d\n", index); } - /* next record, still in buffer? */ - ++index; - if (index > last_index) { + /* exit if the last index is reached */ + if (index >= last_index) { rc = 0; goto out; } + index++; } } |