diff options
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r-- | drivers/block/rbd.c | 359 |
1 files changed, 215 insertions, 144 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 26812c1..454bf9c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -387,6 +387,7 @@ struct rbd_device { struct rw_semaphore lock_rwsem; enum rbd_lock_state lock_state; + char lock_cookie[32]; struct rbd_client_id owner_cid; struct work_struct acquired_lock_work; struct work_struct released_lock_work; @@ -477,13 +478,6 @@ static int minor_to_rbd_dev_id(int minor) return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; } -static bool rbd_is_lock_supported(struct rbd_device *rbd_dev) -{ - return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && - rbd_dev->spec->snap_id == CEPH_NOSNAP && - !rbd_dev->mapping.read_only; -} - static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) { return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || @@ -731,7 +725,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) kref_init(&rbdc->kref); INIT_LIST_HEAD(&rbdc->node); - rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); + rbdc->client = ceph_create_client(ceph_opts, rbdc); if (IS_ERR(rbdc->client)) goto out_rbdc; ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ @@ -804,6 +798,7 @@ enum { Opt_read_only, Opt_read_write, Opt_lock_on_read, + Opt_exclusive, Opt_err }; @@ -816,6 +811,7 @@ static match_table_t rbd_opts_tokens = { {Opt_read_write, "read_write"}, {Opt_read_write, "rw"}, /* Alternate spelling */ {Opt_lock_on_read, "lock_on_read"}, + {Opt_exclusive, "exclusive"}, {Opt_err, NULL} }; @@ -823,11 +819,13 @@ struct rbd_options { int queue_depth; bool read_only; bool lock_on_read; + bool exclusive; }; #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ #define RBD_READ_ONLY_DEFAULT false #define RBD_LOCK_ON_READ_DEFAULT false +#define RBD_EXCLUSIVE_DEFAULT false static int parse_rbd_opts_token(char *c, void *private) { @@ -866,6 +864,9 @@ static int parse_rbd_opts_token(char *c, void *private) case Opt_lock_on_read: rbd_opts->lock_on_read = true; break; + case Opt_exclusive: + rbd_opts->exclusive = true; + break; default: /* libceph prints "bad option" msg */ return -EINVAL; @@ -3079,7 +3080,8 @@ static int rbd_lock(struct rbd_device *rbd_dev) char cookie[32]; int ret; - WARN_ON(__rbd_is_lock_owner(rbd_dev)); + WARN_ON(__rbd_is_lock_owner(rbd_dev) || + rbd_dev->lock_cookie[0] != '\0'); format_lock_cookie(rbd_dev, cookie); ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, @@ -3089,6 +3091,7 @@ static int rbd_lock(struct rbd_device *rbd_dev) return ret; rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; + strcpy(rbd_dev->lock_cookie, cookie); rbd_set_owner_cid(rbd_dev, &cid); queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); return 0; @@ -3097,27 +3100,24 @@ static int rbd_lock(struct rbd_device *rbd_dev) /* * lock_rwsem must be held for write */ -static int rbd_unlock(struct rbd_device *rbd_dev) +static void rbd_unlock(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - char cookie[32]; int ret; - WARN_ON(!__rbd_is_lock_owner(rbd_dev)); - - rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + WARN_ON(!__rbd_is_lock_owner(rbd_dev) || + rbd_dev->lock_cookie[0] == '\0'); - format_lock_cookie(rbd_dev, cookie); ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, - RBD_LOCK_NAME, cookie); - if (ret && ret != -ENOENT) { - rbd_warn(rbd_dev, "cls_unlock failed: %d", ret); - return ret; - } + RBD_LOCK_NAME, rbd_dev->lock_cookie); + if (ret && ret != -ENOENT) + rbd_warn(rbd_dev, "failed to unlock: %d", ret); + /* treat errors as the image is unlocked */ + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + rbd_dev->lock_cookie[0] = '\0'; rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); - return 0; } static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, @@ -3447,6 +3447,18 @@ again: ret = rbd_request_lock(rbd_dev); if (ret == -ETIMEDOUT) { goto again; /* treat this as a dead client */ + } else if (ret == -EROFS) { + rbd_warn(rbd_dev, "peer will not release lock"); + /* + * If this is rbd_add_acquire_lock(), we want to fail + * immediately -- reuse BLACKLISTED flag. Otherwise we + * want to block. + */ + if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { + set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); + /* wake "rbd map --exclusive" process */ + wake_requests(rbd_dev, false); + } } else if (ret < 0) { rbd_warn(rbd_dev, "error requesting lock: %d", ret); mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, @@ -3490,16 +3502,15 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev) if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) return false; - if (!rbd_unlock(rbd_dev)) - /* - * Give others a chance to grab the lock - we would re-acquire - * almost immediately if we got new IO during ceph_osdc_sync() - * otherwise. We need to ack our own notifications, so this - * lock_dwork will be requeued from rbd_wait_state_locked() - * after wake_requests() in rbd_handle_released_lock(). - */ - cancel_delayed_work(&rbd_dev->lock_dwork); - + rbd_unlock(rbd_dev); + /* + * Give others a chance to grab the lock - we would re-acquire + * almost immediately if we got new IO during ceph_osdc_sync() + * otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_wait_state_locked() + * after wake_requests() in rbd_handle_released_lock(). + */ + cancel_delayed_work(&rbd_dev->lock_dwork); return true; } @@ -3580,12 +3591,16 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, up_read(&rbd_dev->lock_rwsem); } -static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, - void **p) +/* + * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no + * ResponseMessage is needed. + */ +static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) { struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); struct rbd_client_id cid = { 0 }; - bool need_to_send; + int result = 1; if (struct_v >= 2) { cid.gid = ceph_decode_64(p); @@ -3595,19 +3610,36 @@ static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, cid.handle); if (rbd_cid_equal(&cid, &my_cid)) - return false; + return result; down_read(&rbd_dev->lock_rwsem); - need_to_send = __rbd_is_lock_owner(rbd_dev); - if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { - if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) { - dout("%s rbd_dev %p queueing unlock_work\n", __func__, - rbd_dev); - queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work); + if (__rbd_is_lock_owner(rbd_dev)) { + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED && + rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) + goto out_unlock; + + /* + * encode ResponseMessage(0) so the peer can detect + * a missing owner + */ + result = 0; + + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { + if (!rbd_dev->opts->exclusive) { + dout("%s rbd_dev %p queueing unlock_work\n", + __func__, rbd_dev); + queue_work(rbd_dev->task_wq, + &rbd_dev->unlock_work); + } else { + /* refuse to release the lock */ + result = -EROFS; + } } } + +out_unlock: up_read(&rbd_dev->lock_rwsem); - return need_to_send; + return result; } static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, @@ -3690,13 +3722,10 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, rbd_acknowledge_notify(rbd_dev, notify_id, cookie); break; case RBD_NOTIFY_OP_REQUEST_LOCK: - if (rbd_handle_request_lock(rbd_dev, struct_v, &p)) - /* - * send ResponseMessage(0) back so the client - * can detect a missing owner - */ + ret = rbd_handle_request_lock(rbd_dev, struct_v, &p); + if (ret <= 0) rbd_acknowledge_notify_result(rbd_dev, notify_id, - cookie, 0); + cookie, ret); else rbd_acknowledge_notify(rbd_dev, notify_id, cookie); break; @@ -3821,24 +3850,51 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev) ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); } +/* + * lock_rwsem must be held for write + */ +static void rbd_reacquire_lock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + char cookie[32]; + int ret; + + WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); + + format_lock_cookie(rbd_dev, cookie); + ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie, + RBD_LOCK_TAG, cookie); + if (ret) { + if (ret != -EOPNOTSUPP) + rbd_warn(rbd_dev, "failed to update lock cookie: %d", + ret); + + /* + * Lock cookie cannot be updated on older OSDs, so do + * a manual release and queue an acquire. + */ + if (rbd_release_lock(rbd_dev)) + queue_delayed_work(rbd_dev->task_wq, + &rbd_dev->lock_dwork, 0); + } else { + strcpy(rbd_dev->lock_cookie, cookie); + } +} + static void rbd_reregister_watch(struct work_struct *work) { struct rbd_device *rbd_dev = container_of(to_delayed_work(work), struct rbd_device, watch_dwork); - bool was_lock_owner = false; - bool need_to_wake = false; int ret; dout("%s rbd_dev %p\n", __func__, rbd_dev); - down_write(&rbd_dev->lock_rwsem); - if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) - was_lock_owner = rbd_release_lock(rbd_dev); - mutex_lock(&rbd_dev->watch_mutex); if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { mutex_unlock(&rbd_dev->watch_mutex); - goto out; + return; } ret = __rbd_register_watch(rbd_dev); @@ -3846,36 +3902,28 @@ static void rbd_reregister_watch(struct work_struct *work) rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); if (ret == -EBLACKLISTED || ret == -ENOENT) { set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - need_to_wake = true; + wake_requests(rbd_dev, true); } else { queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, RBD_RETRY_DELAY); } mutex_unlock(&rbd_dev->watch_mutex); - goto out; + return; } - need_to_wake = true; rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; mutex_unlock(&rbd_dev->watch_mutex); + down_write(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) + rbd_reacquire_lock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); + ret = rbd_dev_refresh(rbd_dev); if (ret) rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); - - if (was_lock_owner) { - ret = rbd_try_lock(rbd_dev); - if (ret) - rbd_warn(rbd_dev, "reregisteration lock failed: %d", - ret); - } - -out: - up_write(&rbd_dev->lock_rwsem); - if (need_to_wake) - wake_requests(rbd_dev, true); } /* @@ -4034,10 +4082,6 @@ static void rbd_queue_workfn(struct work_struct *work) if (op_type != OBJ_OP_READ) { snapc = rbd_dev->header.snapc; ceph_get_snap_context(snapc); - must_be_locked = rbd_is_lock_supported(rbd_dev); - } else { - must_be_locked = rbd_dev->opts->lock_on_read && - rbd_is_lock_supported(rbd_dev); } up_read(&rbd_dev->header_rwsem); @@ -4048,14 +4092,20 @@ static void rbd_queue_workfn(struct work_struct *work) goto err_rq; } + must_be_locked = + (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && + (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); if (must_be_locked) { down_read(&rbd_dev->lock_rwsem); if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && - !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) + !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { + if (rbd_dev->opts->exclusive) { + rbd_warn(rbd_dev, "exclusive lock required"); + result = -EROFS; + goto err_unlock; + } rbd_wait_state_locked(rbd_dev); - - WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^ - !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)); + } if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { result = -EBLACKLISTED; goto err_unlock; @@ -4114,19 +4164,10 @@ static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx, static void rbd_free_disk(struct rbd_device *rbd_dev) { - struct gendisk *disk = rbd_dev->disk; - - if (!disk) - return; - + blk_cleanup_queue(rbd_dev->disk->queue); + blk_mq_free_tag_set(&rbd_dev->tag_set); + put_disk(rbd_dev->disk); rbd_dev->disk = NULL; - if (disk->flags & GENHD_FL_UP) { - del_gendisk(disk); - if (disk->queue) - blk_cleanup_queue(disk->queue); - blk_mq_free_tag_set(&rbd_dev->tag_set); - } - put_disk(disk); } static int rbd_obj_read_sync(struct rbd_device *rbd_dev, @@ -4383,8 +4424,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; + /* + * disk_release() expects a queue ref from add_disk() and will + * put it. Hold an extra ref until add_disk() is called. + */ + WARN_ON(!blk_get_queue(q)); disk->queue = q; - q->queuedata = rbd_dev; rbd_dev->disk = disk; @@ -5624,6 +5669,7 @@ static int rbd_add_parse_args(const char *buf, rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; + rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT; copts = ceph_parse_options(options, mon_addrs, mon_addrs + mon_addrs_size - 1, @@ -5682,6 +5728,33 @@ again: return ret; } +static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) +{ + down_write(&rbd_dev->lock_rwsem); + if (__rbd_is_lock_owner(rbd_dev)) + rbd_unlock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); +} + +static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) +{ + if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { + rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); + return -EINVAL; + } + + /* FIXME: "rbd map --exclusive" should be in interruptible */ + down_read(&rbd_dev->lock_rwsem); + rbd_wait_state_locked(rbd_dev); + up_read(&rbd_dev->lock_rwsem); + if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { + rbd_warn(rbd_dev, "failed to acquire exclusive lock"); + return -EROFS; + } + + return 0; +} + /* * An rbd format 2 image has a unique identifier, distinct from the * name given to it by the user. Internally, that identifier is @@ -5873,6 +5946,15 @@ out_err: return ret; } +static void rbd_dev_device_release(struct rbd_device *rbd_dev) +{ + clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); + rbd_dev_mapping_clear(rbd_dev); + rbd_free_disk(rbd_dev); + if (!single_major) + unregister_blkdev(rbd_dev->major, rbd_dev->name); +} + /* * rbd_dev->header_rwsem must be locked for write and will be unlocked * upon return. @@ -5908,26 +5990,13 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); - dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); - ret = device_add(&rbd_dev->dev); + ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); if (ret) goto err_out_mapping; - /* Everything's ready. Announce the disk to the world. */ - set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); up_write(&rbd_dev->header_rwsem); - - spin_lock(&rbd_dev_list_lock); - list_add_tail(&rbd_dev->node, &rbd_dev_list); - spin_unlock(&rbd_dev_list_lock); - - add_disk(rbd_dev->disk); - pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, - (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, - rbd_dev->header.features); - - return ret; + return 0; err_out_mapping: rbd_dev_mapping_clear(rbd_dev); @@ -5962,11 +6031,11 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) static void rbd_dev_image_release(struct rbd_device *rbd_dev) { rbd_dev_unprobe(rbd_dev); + if (rbd_dev->opts) + rbd_unregister_watch(rbd_dev); rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); rbd_dev->spec->image_id = NULL; - - rbd_dev_destroy(rbd_dev); } /* @@ -6126,22 +6195,43 @@ static ssize_t do_rbd_add(struct bus_type *bus, rbd_dev->mapping.read_only = read_only; rc = rbd_dev_device_setup(rbd_dev); - if (rc) { - /* - * rbd_unregister_watch() can't be moved into - * rbd_dev_image_release() without refactoring, see - * commit 1f3ef78861ac. - */ - rbd_unregister_watch(rbd_dev); - rbd_dev_image_release(rbd_dev); - goto out; + if (rc) + goto err_out_image_probe; + + if (rbd_dev->opts->exclusive) { + rc = rbd_add_acquire_lock(rbd_dev); + if (rc) + goto err_out_device_setup; } + /* Everything's ready. Announce the disk to the world. */ + + rc = device_add(&rbd_dev->dev); + if (rc) + goto err_out_image_lock; + + add_disk(rbd_dev->disk); + /* see rbd_init_disk() */ + blk_put_queue(rbd_dev->disk->queue); + + spin_lock(&rbd_dev_list_lock); + list_add_tail(&rbd_dev->node, &rbd_dev_list); + spin_unlock(&rbd_dev_list_lock); + + pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, + (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, + rbd_dev->header.features); rc = count; out: module_put(THIS_MODULE); return rc; +err_out_image_lock: + rbd_dev_image_unlock(rbd_dev); +err_out_device_setup: + rbd_dev_device_release(rbd_dev); +err_out_image_probe: + rbd_dev_image_release(rbd_dev); err_out_rbd_dev: rbd_dev_destroy(rbd_dev); err_out_client: @@ -6169,21 +6259,6 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, return do_rbd_add(bus, buf, count); } -static void rbd_dev_device_release(struct rbd_device *rbd_dev) -{ - rbd_free_disk(rbd_dev); - - spin_lock(&rbd_dev_list_lock); - list_del_init(&rbd_dev->node); - spin_unlock(&rbd_dev_list_lock); - - clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); - device_del(&rbd_dev->dev); - rbd_dev_mapping_clear(rbd_dev); - if (!single_major) - unregister_blkdev(rbd_dev->major, rbd_dev->name); -} - static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) { while (rbd_dev->parent) { @@ -6201,6 +6276,7 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) } rbd_assert(second); rbd_dev_image_release(second); + rbd_dev_destroy(second); first->parent = NULL; first->parent_overlap = 0; @@ -6269,21 +6345,16 @@ static ssize_t do_rbd_remove(struct bus_type *bus, blk_set_queue_dying(rbd_dev->disk->queue); } - down_write(&rbd_dev->lock_rwsem); - if (__rbd_is_lock_owner(rbd_dev)) - rbd_unlock(rbd_dev); - up_write(&rbd_dev->lock_rwsem); - rbd_unregister_watch(rbd_dev); + del_gendisk(rbd_dev->disk); + spin_lock(&rbd_dev_list_lock); + list_del_init(&rbd_dev->node); + spin_unlock(&rbd_dev_list_lock); + device_del(&rbd_dev->dev); - /* - * Don't free anything from rbd_dev->disk until after all - * notifies are completely processed. Otherwise - * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting - * in a potential use after free of rbd_dev->disk or rbd_dev. - */ + rbd_dev_image_unlock(rbd_dev); rbd_dev_device_release(rbd_dev); rbd_dev_image_release(rbd_dev); - + rbd_dev_destroy(rbd_dev); return count; } |