aboutsummaryrefslogtreecommitdiff
path: root/drivers/block/drbd/drbd_req.c
diff options
context:
space:
mode:
authorLars Ellenberg <lars.ellenberg@linbit.com>2011-11-28 15:04:49 +0100
committerPhilipp Reisner <philipp.reisner@linbit.com>2012-11-08 16:58:35 +0100
commitb6dd1a89767bc33e9c98b3195f8925b46c5c95f3 (patch)
treee82371062171f5cade79cb0c4a6cd22486b5f082 /drivers/block/drbd/drbd_req.c
parentd5b27b01f17ef1f0badc45f9eea521be3457c9cb (diff)
downloadvexpress-lsk-b6dd1a89767bc33e9c98b3195f8925b46c5c95f3.tar.gz
drbd: remove struct drbd_tl_epoch objects (barrier works)
cherry-picked and adapted from drbd 9 devel branch DRBD requests (struct drbd_request) are already on the per resource transfer log list, and carry their epoch number. We do not need to additionally link them on other ring lists in other structs. The drbd sender thread can recognize itself when to send a P_BARRIER, by tracking the currently processed epoch, and how many writes have been processed for that epoch. If the epoch of the request to be processed does not match the currently processed epoch, any writes have been processed in it, a P_BARRIER for this last processed epoch is send out first. The new epoch then becomes the currently processed epoch. To not get stuck in drbd_al_begin_io() waiting for P_BARRIER_ACK, the sender thread also needs to handle the case when the current epoch was closed already, but no new requests are queued yet, and send out P_BARRIER as soon as possible. This is done by comparing the per resource "current transfer log epoch" (tconn->current_tle_nr) with the per connection "currently processed epoch number" (tconn->send.current_epoch_nr), while waiting for new requests to be processed in wait_for_work(). Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com> Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers/block/drbd/drbd_req.c')
-rw-r--r--drivers/block/drbd/drbd_req.c157
1 files changed, 43 insertions, 114 deletions
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index e609557a942..ca28b56b7a2 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -149,46 +149,16 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
drbd_req_free(req);
}
-static void queue_barrier(struct drbd_conf *mdev)
-{
- struct drbd_tl_epoch *b;
- struct drbd_tconn *tconn = mdev->tconn;
-
- /* We are within the req_lock. Once we queued the barrier for sending,
- * we set the CREATE_BARRIER bit. It is cleared as soon as a new
- * barrier/epoch object is added. This is the only place this bit is
- * set. It indicates that the barrier for this epoch is already queued,
- * and no new epoch has been created yet. */
- if (test_bit(CREATE_BARRIER, &tconn->flags))
- return;
-
- b = tconn->newest_tle;
- b->w.cb = w_send_barrier;
- b->w.mdev = mdev;
- /* inc_ap_pending done here, so we won't
- * get imbalanced on connection loss.
- * dec_ap_pending will be done in got_BarrierAck
- * or (on connection loss) in tl_clear. */
- inc_ap_pending(mdev);
- drbd_queue_work(&tconn->sender_work, &b->w);
- set_bit(CREATE_BARRIER, &tconn->flags);
+static void wake_all_senders(struct drbd_tconn *tconn) {
+ wake_up(&tconn->sender_work.q_wait);
}
-static void _about_to_complete_local_write(struct drbd_conf *mdev,
- struct drbd_request *req)
+/* must hold resource->req_lock */
+static void start_new_tl_epoch(struct drbd_tconn *tconn)
{
- const unsigned long s = req->rq_state;
-
- /* Before we can signal completion to the upper layers,
- * we may need to close the current epoch.
- * We can skip this, if this request has not even been sent, because we
- * did not have a fully established connection yet/anymore, during
- * bitmap exchange, or while we are C_AHEAD due to congestion policy.
- */
- if (mdev->state.conn >= C_CONNECTED &&
- (s & RQ_NET_SENT) != 0 &&
- req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
- queue_barrier(mdev);
+ tconn->current_tle_writes = 0;
+ atomic_inc(&tconn->current_tle_nr);
+ wake_all_senders(tconn);
}
void complete_master_bio(struct drbd_conf *mdev,
@@ -320,9 +290,16 @@ void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m)
} else if (!(s & RQ_POSTPONED))
D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
- /* for writes we need to do some extra housekeeping */
- if (rw == WRITE)
- _about_to_complete_local_write(mdev, req);
+ /* Before we can signal completion to the upper layers,
+ * we may need to close the current transfer log epoch.
+ * We are within the request lock, so we can simply compare
+ * the request epoch number with the current transfer log
+ * epoch number. If they match, increase the current_tle_nr,
+ * and reset the transfer log epoch write_cnt.
+ */
+ if (rw == WRITE &&
+ req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
+ start_new_tl_epoch(mdev->tconn);
/* Update disk stats */
_drbd_end_io_acct(mdev, req);
@@ -514,15 +491,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
* hurting performance. */
set_bit(UNPLUG_REMOTE, &mdev->flags);
- /* see __drbd_make_request,
- * just after it grabs the req_lock */
- D_ASSERT(test_bit(CREATE_BARRIER, &mdev->tconn->flags) == 0);
-
- req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
-
- /* increment size of current epoch */
- mdev->tconn->newest_tle->n_writes++;
-
/* queue work item to send data */
D_ASSERT(req->rq_state & RQ_NET_PENDING);
req->rq_state |= RQ_NET_QUEUED;
@@ -534,8 +502,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
nc = rcu_dereference(mdev->tconn->net_conf);
p = nc->max_epoch_size;
rcu_read_unlock();
- if (mdev->tconn->newest_tle->n_writes >= p)
- queue_barrier(mdev);
+ if (mdev->tconn->current_tle_writes >= p)
+ start_new_tl_epoch(mdev->tconn);
break;
@@ -692,6 +660,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
During connection handshake, we ensure that the peer was not rebooted. */
if (!(req->rq_state & RQ_NET_OK)) {
if (req->w.cb) {
+ /* w.cb expected to be w_send_dblock, or w_send_read_req */
drbd_queue_work(&mdev->tconn->sender_work, &req->w);
rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
}
@@ -708,7 +677,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
* this is bad, because if the connection is lost now,
* we won't be able to clean them up... */
dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
- list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
}
if ((req->rq_state & RQ_NET_MASK) != 0) {
req->rq_state |= RQ_NET_DONE;
@@ -835,7 +803,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
const int rw = bio_rw(bio);
const int size = bio->bi_size;
const sector_t sector = bio->bi_sector;
- struct drbd_tl_epoch *b = NULL;
struct drbd_request *req;
struct net_conf *nc;
int local, remote, send_oos = 0;
@@ -916,24 +883,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
goto fail_free_complete;
}
- /* For WRITE request, we have to make sure that we have an
- * unused_spare_tle, in case we need to start a new epoch.
- * I try to be smart and avoid to pre-allocate always "just in case",
- * but there is a race between testing the bit and pointer outside the
- * spinlock, and grabbing the spinlock.
- * if we lost that race, we retry. */
- if (rw == WRITE && (remote || send_oos) &&
- mdev->tconn->unused_spare_tle == NULL &&
- test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
-allocate_barrier:
- b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
- if (!b) {
- dev_err(DEV, "Failed to alloc barrier.\n");
- err = -ENOMEM;
- goto fail_free_complete;
- }
- }
-
/* GOOD, everything prepared, grab the spin_lock */
spin_lock_irq(&mdev->tconn->req_lock);
@@ -969,42 +918,9 @@ allocate_barrier:
}
}
- if (b && mdev->tconn->unused_spare_tle == NULL) {
- mdev->tconn->unused_spare_tle = b;
- b = NULL;
- }
- if (rw == WRITE && (remote || send_oos) &&
- mdev->tconn->unused_spare_tle == NULL &&
- test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
- /* someone closed the current epoch
- * while we were grabbing the spinlock */
- spin_unlock_irq(&mdev->tconn->req_lock);
- goto allocate_barrier;
- }
-
-
/* Update disk stats */
_drbd_start_io_acct(mdev, req, bio);
- /* _maybe_start_new_epoch(mdev);
- * If we need to generate a write barrier packet, we have to add the
- * new epoch (barrier) object, and queue the barrier packet for sending,
- * and queue the req's data after it _within the same lock_, otherwise
- * we have race conditions were the reorder domains could be mixed up.
- *
- * Even read requests may start a new epoch and queue the corresponding
- * barrier packet. To get the write ordering right, we only have to
- * make sure that, if this is a write request and it triggered a
- * barrier packet, this request is queued within the same spinlock. */
- if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
- test_and_clear_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
- _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
- mdev->tconn->unused_spare_tle = NULL;
- } else {
- D_ASSERT(!(remote && rw == WRITE &&
- test_bit(CREATE_BARRIER, &mdev->tconn->flags)));
- }
-
/* NOTE
* Actually, 'local' may be wrong here already, since we may have failed
* to write to the meta data, and may become wrong anytime because of
@@ -1025,7 +941,12 @@ allocate_barrier:
if (local)
_req_mod(req, TO_BE_SUBMITTED);
- list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests);
+ /* which transfer log epoch does this belong to? */
+ req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
+ if (rw == WRITE)
+ mdev->tconn->current_tle_writes++;
+
+ list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log);
/* NOTE remote first: to get the concurrent write detection right,
* we must register the request before start of local IO. */
@@ -1059,7 +980,9 @@ allocate_barrier:
}
if (congested) {
- queue_barrier(mdev); /* last barrier, after mirrored writes */
+ if (mdev->tconn->current_tle_writes)
+ /* start a new epoch for non-mirrored writes */
+ start_new_tl_epoch(mdev->tconn);
if (nc->on_congestion == OC_PULL_AHEAD)
_drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
@@ -1070,7 +993,6 @@ allocate_barrier:
rcu_read_unlock();
spin_unlock_irq(&mdev->tconn->req_lock);
- kfree(b); /* if someone else has beaten us to it... */
if (local) {
req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
@@ -1108,7 +1030,6 @@ fail_and_free_req:
drbd_req_free(req);
dec_ap_bio(mdev);
- kfree(b);
return ret;
}
@@ -1164,12 +1085,23 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct
return limit;
}
+struct drbd_request *find_oldest_request(struct drbd_tconn *tconn)
+{
+ /* Walk the transfer log,
+ * and find the oldest not yet completed request */
+ struct drbd_request *r;
+ list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
+ if (r->rq_state & (RQ_NET_PENDING|RQ_LOCAL_PENDING))
+ return r;
+ }
+ return NULL;
+}
+
void request_timer_fn(unsigned long data)
{
struct drbd_conf *mdev = (struct drbd_conf *) data;
struct drbd_tconn *tconn = mdev->tconn;
struct drbd_request *req; /* oldest request */
- struct list_head *le;
struct net_conf *nc;
unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
unsigned long now;
@@ -1193,16 +1125,13 @@ void request_timer_fn(unsigned long data)
now = jiffies;
spin_lock_irq(&tconn->req_lock);
- le = &tconn->oldest_tle->requests;
- if (list_empty(le)) {
+ req = find_oldest_request(tconn);
+ if (!req) {
spin_unlock_irq(&tconn->req_lock);
mod_timer(&mdev->request_timer, now + et);
return;
}
- le = le->prev;
- req = list_entry(le, struct drbd_request, tl_requests);
-
/* The request is considered timed out, if
* - we have some effective timeout from the configuration,
* with above state restrictions applied,