OpenCores
URL https://opencores.org/ocsvn/or1k_soc_on_altera_embedded_dev_kit/or1k_soc_on_altera_embedded_dev_kit/trunk

Subversion Repositories or1k_soc_on_altera_embedded_dev_kit

[/] [or1k_soc_on_altera_embedded_dev_kit/] [tags/] [linux-2.6/] [linux-2.6.24_or32_unified_v2.3/] [net/] [sunrpc/] [sched.c] - Blame information for rev 8

Details | Compare with Previous | View Log

Line No. Rev Author Line
1 3 xianfeng
/*
2
 * linux/net/sunrpc/sched.c
3
 *
4
 * Scheduling for synchronous and asynchronous RPC requests.
5
 *
6
 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7
 *
8
 * TCP NFS related read + write fixes
9
 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10
 */
11
 
12
#include <linux/module.h>
13
 
14
#include <linux/sched.h>
15
#include <linux/interrupt.h>
16
#include <linux/slab.h>
17
#include <linux/mempool.h>
18
#include <linux/smp.h>
19
#include <linux/smp_lock.h>
20
#include <linux/spinlock.h>
21
#include <linux/mutex.h>
22
 
23
#include <linux/sunrpc/clnt.h>
24
 
25
#ifdef RPC_DEBUG
26
#define RPCDBG_FACILITY         RPCDBG_SCHED
27
#define RPC_TASK_MAGIC_ID       0xf00baa
28
#endif
29
 
30
/*
31
 * RPC slabs and memory pools
32
 */
33
#define RPC_BUFFER_MAXSIZE      (2048)
34
#define RPC_BUFFER_POOLSIZE     (8)
35
#define RPC_TASK_POOLSIZE       (8)
36
static struct kmem_cache        *rpc_task_slabp __read_mostly;
37
static struct kmem_cache        *rpc_buffer_slabp __read_mostly;
38
static mempool_t        *rpc_task_mempool __read_mostly;
39
static mempool_t        *rpc_buffer_mempool __read_mostly;
40
 
41
static void                     __rpc_default_timer(struct rpc_task *task);
42
static void                     rpc_async_schedule(struct work_struct *);
43
static void                      rpc_release_task(struct rpc_task *task);
44
 
45
/*
46
 * RPC tasks sit here while waiting for conditions to improve.
47
 */
48
static RPC_WAITQ(delay_queue, "delayq");
49
 
50
/*
51
 * rpciod-related stuff
52
 */
53
struct workqueue_struct *rpciod_workqueue;
54
 
55
/*
56
 * Disable the timer for a given RPC task. Should be called with
57
 * queue->lock and bh_disabled in order to avoid races within
58
 * rpc_run_timer().
59
 */
60
static inline void
61
__rpc_disable_timer(struct rpc_task *task)
62
{
63
        dprintk("RPC: %5u disabling timer\n", task->tk_pid);
64
        task->tk_timeout_fn = NULL;
65
        task->tk_timeout = 0;
66
}
67
 
68
/*
69
 * Run a timeout function.
70
 * We use the callback in order to allow __rpc_wake_up_task()
71
 * and friends to disable the timer synchronously on SMP systems
72
 * without calling del_timer_sync(). The latter could cause a
73
 * deadlock if called while we're holding spinlocks...
74
 */
75
static void rpc_run_timer(struct rpc_task *task)
76
{
77
        void (*callback)(struct rpc_task *);
78
 
79
        callback = task->tk_timeout_fn;
80
        task->tk_timeout_fn = NULL;
81
        if (callback && RPC_IS_QUEUED(task)) {
82
                dprintk("RPC: %5u running timer\n", task->tk_pid);
83
                callback(task);
84
        }
85
        smp_mb__before_clear_bit();
86
        clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
87
        smp_mb__after_clear_bit();
88
}
89
 
90
/*
91
 * Set up a timer for the current task.
92
 */
93
static inline void
94
__rpc_add_timer(struct rpc_task *task, rpc_action timer)
95
{
96
        if (!task->tk_timeout)
97
                return;
98
 
99
        dprintk("RPC: %5u setting alarm for %lu ms\n",
100
                        task->tk_pid, task->tk_timeout * 1000 / HZ);
101
 
102
        if (timer)
103
                task->tk_timeout_fn = timer;
104
        else
105
                task->tk_timeout_fn = __rpc_default_timer;
106
        set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
107
        mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
108
}
109
 
110
/*
111
 * Delete any timer for the current task. Because we use del_timer_sync(),
112
 * this function should never be called while holding queue->lock.
113
 */
114
static void
115
rpc_delete_timer(struct rpc_task *task)
116
{
117
        if (RPC_IS_QUEUED(task))
118
                return;
119
        if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
120
                del_singleshot_timer_sync(&task->tk_timer);
121
                dprintk("RPC: %5u deleting timer\n", task->tk_pid);
122
        }
123
}
124
 
125
/*
126
 * Add new request to a priority queue.
127
 */
128
static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
129
{
130
        struct list_head *q;
131
        struct rpc_task *t;
132
 
133
        INIT_LIST_HEAD(&task->u.tk_wait.links);
134
        q = &queue->tasks[task->tk_priority];
135
        if (unlikely(task->tk_priority > queue->maxpriority))
136
                q = &queue->tasks[queue->maxpriority];
137
        list_for_each_entry(t, q, u.tk_wait.list) {
138
                if (t->tk_cookie == task->tk_cookie) {
139
                        list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
140
                        return;
141
                }
142
        }
143
        list_add_tail(&task->u.tk_wait.list, q);
144
}
145
 
146
/*
147
 * Add new request to wait queue.
148
 *
149
 * Swapper tasks always get inserted at the head of the queue.
150
 * This should avoid many nasty memory deadlocks and hopefully
151
 * improve overall performance.
152
 * Everyone else gets appended to the queue to ensure proper FIFO behavior.
153
 */
154
static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
155
{
156
        BUG_ON (RPC_IS_QUEUED(task));
157
 
158
        if (RPC_IS_PRIORITY(queue))
159
                __rpc_add_wait_queue_priority(queue, task);
160
        else if (RPC_IS_SWAPPER(task))
161
                list_add(&task->u.tk_wait.list, &queue->tasks[0]);
162
        else
163
                list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
164
        task->u.tk_wait.rpc_waitq = queue;
165
        queue->qlen++;
166
        rpc_set_queued(task);
167
 
168
        dprintk("RPC: %5u added to queue %p \"%s\"\n",
169
                        task->tk_pid, queue, rpc_qname(queue));
170
}
171
 
172
/*
173
 * Remove request from a priority queue.
174
 */
175
static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
176
{
177
        struct rpc_task *t;
178
 
179
        if (!list_empty(&task->u.tk_wait.links)) {
180
                t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
181
                list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
182
                list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
183
        }
184
        list_del(&task->u.tk_wait.list);
185
}
186
 
187
/*
188
 * Remove request from queue.
189
 * Note: must be called with spin lock held.
190
 */
191
static void __rpc_remove_wait_queue(struct rpc_task *task)
192
{
193
        struct rpc_wait_queue *queue;
194
        queue = task->u.tk_wait.rpc_waitq;
195
 
196
        if (RPC_IS_PRIORITY(queue))
197
                __rpc_remove_wait_queue_priority(task);
198
        else
199
                list_del(&task->u.tk_wait.list);
200
        queue->qlen--;
201
        dprintk("RPC: %5u removed from queue %p \"%s\"\n",
202
                        task->tk_pid, queue, rpc_qname(queue));
203
}
204
 
205
static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
206
{
207
        queue->priority = priority;
208
        queue->count = 1 << (priority * 2);
209
}
210
 
211
static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie)
212
{
213
        queue->cookie = cookie;
214
        queue->nr = RPC_BATCH_COUNT;
215
}
216
 
217
static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
218
{
219
        rpc_set_waitqueue_priority(queue, queue->maxpriority);
220
        rpc_set_waitqueue_cookie(queue, 0);
221
}
222
 
223
static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio)
224
{
225
        int i;
226
 
227
        spin_lock_init(&queue->lock);
228
        for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
229
                INIT_LIST_HEAD(&queue->tasks[i]);
230
        queue->maxpriority = maxprio;
231
        rpc_reset_waitqueue_priority(queue);
232
#ifdef RPC_DEBUG
233
        queue->name = qname;
234
#endif
235
}
236
 
237
void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
238
{
239
        __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH);
240
}
241
 
242
void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
243
{
244
        __rpc_init_priority_wait_queue(queue, qname, 0);
245
}
246
EXPORT_SYMBOL(rpc_init_wait_queue);
247
 
248
static int rpc_wait_bit_interruptible(void *word)
249
{
250
        if (signal_pending(current))
251
                return -ERESTARTSYS;
252
        schedule();
253
        return 0;
254
}
255
 
256
#ifdef RPC_DEBUG
257
static void rpc_task_set_debuginfo(struct rpc_task *task)
258
{
259
        static atomic_t rpc_pid;
260
 
261
        task->tk_magic = RPC_TASK_MAGIC_ID;
262
        task->tk_pid = atomic_inc_return(&rpc_pid);
263
}
264
#else
265
static inline void rpc_task_set_debuginfo(struct rpc_task *task)
266
{
267
}
268
#endif
269
 
270
static void rpc_set_active(struct rpc_task *task)
271
{
272
        struct rpc_clnt *clnt;
273
        if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
274
                return;
275
        rpc_task_set_debuginfo(task);
276
        /* Add to global list of all tasks */
277
        clnt = task->tk_client;
278
        if (clnt != NULL) {
279
                spin_lock(&clnt->cl_lock);
280
                list_add_tail(&task->tk_task, &clnt->cl_tasks);
281
                spin_unlock(&clnt->cl_lock);
282
        }
283
}
284
 
285
/*
286
 * Mark an RPC call as having completed by clearing the 'active' bit
287
 */
288
static void rpc_mark_complete_task(struct rpc_task *task)
289
{
290
        smp_mb__before_clear_bit();
291
        clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
292
        smp_mb__after_clear_bit();
293
        wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
294
}
295
 
296
/*
297
 * Allow callers to wait for completion of an RPC call
298
 */
299
int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
300
{
301
        if (action == NULL)
302
                action = rpc_wait_bit_interruptible;
303
        return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
304
                        action, TASK_INTERRUPTIBLE);
305
}
306
EXPORT_SYMBOL(__rpc_wait_for_completion_task);
307
 
308
/*
309
 * Make an RPC task runnable.
310
 *
311
 * Note: If the task is ASYNC, this must be called with
312
 * the spinlock held to protect the wait queue operation.
313
 */
314
static void rpc_make_runnable(struct rpc_task *task)
315
{
316
        BUG_ON(task->tk_timeout_fn);
317
        rpc_clear_queued(task);
318
        if (rpc_test_and_set_running(task))
319
                return;
320
        /* We might have raced */
321
        if (RPC_IS_QUEUED(task)) {
322
                rpc_clear_running(task);
323
                return;
324
        }
325
        if (RPC_IS_ASYNC(task)) {
326
                int status;
327
 
328
                INIT_WORK(&task->u.tk_work, rpc_async_schedule);
329
                status = queue_work(task->tk_workqueue, &task->u.tk_work);
330
                if (status < 0) {
331
                        printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
332
                        task->tk_status = status;
333
                        return;
334
                }
335
        } else
336
                wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
337
}
338
 
339
/*
340
 * Prepare for sleeping on a wait queue.
341
 * By always appending tasks to the list we ensure FIFO behavior.
342
 * NB: An RPC task will only receive interrupt-driven events as long
343
 * as it's on a wait queue.
344
 */
345
static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
346
                        rpc_action action, rpc_action timer)
347
{
348
        dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
349
                        task->tk_pid, rpc_qname(q), jiffies);
350
 
351
        if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
352
                printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
353
                return;
354
        }
355
 
356
        __rpc_add_wait_queue(q, task);
357
 
358
        BUG_ON(task->tk_callback != NULL);
359
        task->tk_callback = action;
360
        __rpc_add_timer(task, timer);
361
}
362
 
363
void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
364
                                rpc_action action, rpc_action timer)
365
{
366
        /* Mark the task as being activated if so needed */
367
        rpc_set_active(task);
368
 
369
        /*
370
         * Protect the queue operations.
371
         */
372
        spin_lock_bh(&q->lock);
373
        __rpc_sleep_on(q, task, action, timer);
374
        spin_unlock_bh(&q->lock);
375
}
376
 
377
/**
378
 * __rpc_do_wake_up_task - wake up a single rpc_task
379
 * @task: task to be woken up
380
 *
381
 * Caller must hold queue->lock, and have cleared the task queued flag.
382
 */
383
static void __rpc_do_wake_up_task(struct rpc_task *task)
384
{
385
        dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
386
                        task->tk_pid, jiffies);
387
 
388
#ifdef RPC_DEBUG
389
        BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
390
#endif
391
        /* Has the task been executed yet? If not, we cannot wake it up! */
392
        if (!RPC_IS_ACTIVATED(task)) {
393
                printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
394
                return;
395
        }
396
 
397
        __rpc_disable_timer(task);
398
        __rpc_remove_wait_queue(task);
399
 
400
        rpc_make_runnable(task);
401
 
402
        dprintk("RPC:       __rpc_wake_up_task done\n");
403
}
404
 
405
/*
406
 * Wake up the specified task
407
 */
408
static void __rpc_wake_up_task(struct rpc_task *task)
409
{
410
        if (rpc_start_wakeup(task)) {
411
                if (RPC_IS_QUEUED(task))
412
                        __rpc_do_wake_up_task(task);
413
                rpc_finish_wakeup(task);
414
        }
415
}
416
 
417
/*
418
 * Default timeout handler if none specified by user
419
 */
420
static void
421
__rpc_default_timer(struct rpc_task *task)
422
{
423
        dprintk("RPC: %5u timeout (default timer)\n", task->tk_pid);
424
        task->tk_status = -ETIMEDOUT;
425
        rpc_wake_up_task(task);
426
}
427
 
428
/*
429
 * Wake up the specified task
430
 */
431
void rpc_wake_up_task(struct rpc_task *task)
432
{
433
        rcu_read_lock_bh();
434
        if (rpc_start_wakeup(task)) {
435
                if (RPC_IS_QUEUED(task)) {
436
                        struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
437
 
438
                        /* Note: we're already in a bh-safe context */
439
                        spin_lock(&queue->lock);
440
                        __rpc_do_wake_up_task(task);
441
                        spin_unlock(&queue->lock);
442
                }
443
                rpc_finish_wakeup(task);
444
        }
445
        rcu_read_unlock_bh();
446
}
447
 
448
/*
449
 * Wake up the next task on a priority queue.
450
 */
451
static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
452
{
453
        struct list_head *q;
454
        struct rpc_task *task;
455
 
456
        /*
457
         * Service a batch of tasks from a single cookie.
458
         */
459
        q = &queue->tasks[queue->priority];
460
        if (!list_empty(q)) {
461
                task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
462
                if (queue->cookie == task->tk_cookie) {
463
                        if (--queue->nr)
464
                                goto out;
465
                        list_move_tail(&task->u.tk_wait.list, q);
466
                }
467
                /*
468
                 * Check if we need to switch queues.
469
                 */
470
                if (--queue->count)
471
                        goto new_cookie;
472
        }
473
 
474
        /*
475
         * Service the next queue.
476
         */
477
        do {
478
                if (q == &queue->tasks[0])
479
                        q = &queue->tasks[queue->maxpriority];
480
                else
481
                        q = q - 1;
482
                if (!list_empty(q)) {
483
                        task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
484
                        goto new_queue;
485
                }
486
        } while (q != &queue->tasks[queue->priority]);
487
 
488
        rpc_reset_waitqueue_priority(queue);
489
        return NULL;
490
 
491
new_queue:
492
        rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
493
new_cookie:
494
        rpc_set_waitqueue_cookie(queue, task->tk_cookie);
495
out:
496
        __rpc_wake_up_task(task);
497
        return task;
498
}
499
 
500
/*
501
 * Wake up the next task on the wait queue.
502
 */
503
struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
504
{
505
        struct rpc_task *task = NULL;
506
 
507
        dprintk("RPC:       wake_up_next(%p \"%s\")\n",
508
                        queue, rpc_qname(queue));
509
        rcu_read_lock_bh();
510
        spin_lock(&queue->lock);
511
        if (RPC_IS_PRIORITY(queue))
512
                task = __rpc_wake_up_next_priority(queue);
513
        else {
514
                task_for_first(task, &queue->tasks[0])
515
                        __rpc_wake_up_task(task);
516
        }
517
        spin_unlock(&queue->lock);
518
        rcu_read_unlock_bh();
519
 
520
        return task;
521
}
522
 
523
/**
524
 * rpc_wake_up - wake up all rpc_tasks
525
 * @queue: rpc_wait_queue on which the tasks are sleeping
526
 *
527
 * Grabs queue->lock
528
 */
529
void rpc_wake_up(struct rpc_wait_queue *queue)
530
{
531
        struct rpc_task *task, *next;
532
        struct list_head *head;
533
 
534
        rcu_read_lock_bh();
535
        spin_lock(&queue->lock);
536
        head = &queue->tasks[queue->maxpriority];
537
        for (;;) {
538
                list_for_each_entry_safe(task, next, head, u.tk_wait.list)
539
                        __rpc_wake_up_task(task);
540
                if (head == &queue->tasks[0])
541
                        break;
542
                head--;
543
        }
544
        spin_unlock(&queue->lock);
545
        rcu_read_unlock_bh();
546
}
547
 
548
/**
549
 * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
550
 * @queue: rpc_wait_queue on which the tasks are sleeping
551
 * @status: status value to set
552
 *
553
 * Grabs queue->lock
554
 */
555
void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
556
{
557
        struct rpc_task *task, *next;
558
        struct list_head *head;
559
 
560
        rcu_read_lock_bh();
561
        spin_lock(&queue->lock);
562
        head = &queue->tasks[queue->maxpriority];
563
        for (;;) {
564
                list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
565
                        task->tk_status = status;
566
                        __rpc_wake_up_task(task);
567
                }
568
                if (head == &queue->tasks[0])
569
                        break;
570
                head--;
571
        }
572
        spin_unlock(&queue->lock);
573
        rcu_read_unlock_bh();
574
}
575
 
576
static void __rpc_atrun(struct rpc_task *task)
577
{
578
        rpc_wake_up_task(task);
579
}
580
 
581
/*
582
 * Run a task at a later time
583
 */
584
void rpc_delay(struct rpc_task *task, unsigned long delay)
585
{
586
        task->tk_timeout = delay;
587
        rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
588
}
589
 
590
/*
591
 * Helper to call task->tk_ops->rpc_call_prepare
592
 */
593
static void rpc_prepare_task(struct rpc_task *task)
594
{
595
        lock_kernel();
596
        task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
597
        unlock_kernel();
598
}
599
 
600
/*
601
 * Helper that calls task->tk_ops->rpc_call_done if it exists
602
 */
603
void rpc_exit_task(struct rpc_task *task)
604
{
605
        task->tk_action = NULL;
606
        if (task->tk_ops->rpc_call_done != NULL) {
607
                lock_kernel();
608
                task->tk_ops->rpc_call_done(task, task->tk_calldata);
609
                unlock_kernel();
610
                if (task->tk_action != NULL) {
611
                        WARN_ON(RPC_ASSASSINATED(task));
612
                        /* Always release the RPC slot and buffer memory */
613
                        xprt_release(task);
614
                }
615
        }
616
}
617
EXPORT_SYMBOL(rpc_exit_task);
618
 
619
void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
620
{
621
        if (ops->rpc_release != NULL) {
622
                lock_kernel();
623
                ops->rpc_release(calldata);
624
                unlock_kernel();
625
        }
626
}
627
 
628
/*
629
 * This is the RPC `scheduler' (or rather, the finite state machine).
630
 */
631
static void __rpc_execute(struct rpc_task *task)
632
{
633
        int             status = 0;
634
 
635
        dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
636
                        task->tk_pid, task->tk_flags);
637
 
638
        BUG_ON(RPC_IS_QUEUED(task));
639
 
640
        for (;;) {
641
                /*
642
                 * Garbage collection of pending timers...
643
                 */
644
                rpc_delete_timer(task);
645
 
646
                /*
647
                 * Execute any pending callback.
648
                 */
649
                if (RPC_DO_CALLBACK(task)) {
650
                        /* Define a callback save pointer */
651
                        void (*save_callback)(struct rpc_task *);
652
 
653
                        /*
654
                         * If a callback exists, save it, reset it,
655
                         * call it.
656
                         * The save is needed to stop from resetting
657
                         * another callback set within the callback handler
658
                         * - Dave
659
                         */
660
                        save_callback=task->tk_callback;
661
                        task->tk_callback=NULL;
662
                        save_callback(task);
663
                }
664
 
665
                /*
666
                 * Perform the next FSM step.
667
                 * tk_action may be NULL when the task has been killed
668
                 * by someone else.
669
                 */
670
                if (!RPC_IS_QUEUED(task)) {
671
                        if (task->tk_action == NULL)
672
                                break;
673
                        task->tk_action(task);
674
                }
675
 
676
                /*
677
                 * Lockless check for whether task is sleeping or not.
678
                 */
679
                if (!RPC_IS_QUEUED(task))
680
                        continue;
681
                rpc_clear_running(task);
682
                if (RPC_IS_ASYNC(task)) {
683
                        /* Careful! we may have raced... */
684
                        if (RPC_IS_QUEUED(task))
685
                                return;
686
                        if (rpc_test_and_set_running(task))
687
                                return;
688
                        continue;
689
                }
690
 
691
                /* sync task: sleep here */
692
                dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
693
                /* Note: Caller should be using rpc_clnt_sigmask() */
694
                status = out_of_line_wait_on_bit(&task->tk_runstate,
695
                                RPC_TASK_QUEUED, rpc_wait_bit_interruptible,
696
                                TASK_INTERRUPTIBLE);
697
                if (status == -ERESTARTSYS) {
698
                        /*
699
                         * When a sync task receives a signal, it exits with
700
                         * -ERESTARTSYS. In order to catch any callbacks that
701
                         * clean up after sleeping on some queue, we don't
702
                         * break the loop here, but go around once more.
703
                         */
704
                        dprintk("RPC: %5u got signal\n", task->tk_pid);
705
                        task->tk_flags |= RPC_TASK_KILLED;
706
                        rpc_exit(task, -ERESTARTSYS);
707
                        rpc_wake_up_task(task);
708
                }
709
                rpc_set_running(task);
710
                dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
711
        }
712
 
713
        dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
714
                        task->tk_status);
715
        /* Release all resources associated with the task */
716
        rpc_release_task(task);
717
}
718
 
719
/*
720
 * User-visible entry point to the scheduler.
721
 *
722
 * This may be called recursively if e.g. an async NFS task updates
723
 * the attributes and finds that dirty pages must be flushed.
724
 * NOTE: Upon exit of this function the task is guaranteed to be
725
 *       released. In particular note that tk_release() will have
726
 *       been called, so your task memory may have been freed.
727
 */
728
void rpc_execute(struct rpc_task *task)
729
{
730
        rpc_set_active(task);
731
        rpc_set_running(task);
732
        __rpc_execute(task);
733
}
734
 
735
static void rpc_async_schedule(struct work_struct *work)
736
{
737
        __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
738
}
739
 
740
struct rpc_buffer {
741
        size_t  len;
742
        char    data[];
743
};
744
 
745
/**
746
 * rpc_malloc - allocate an RPC buffer
747
 * @task: RPC task that will use this buffer
748
 * @size: requested byte size
749
 *
750
 * To prevent rpciod from hanging, this allocator never sleeps,
751
 * returning NULL if the request cannot be serviced immediately.
752
 * The caller can arrange to sleep in a way that is safe for rpciod.
753
 *
754
 * Most requests are 'small' (under 2KiB) and can be serviced from a
755
 * mempool, ensuring that NFS reads and writes can always proceed,
756
 * and that there is good locality of reference for these buffers.
757
 *
758
 * In order to avoid memory starvation triggering more writebacks of
759
 * NFS requests, we avoid using GFP_KERNEL.
760
 */
761
void *rpc_malloc(struct rpc_task *task, size_t size)
762
{
763
        struct rpc_buffer *buf;
764
        gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
765
 
766
        size += sizeof(struct rpc_buffer);
767
        if (size <= RPC_BUFFER_MAXSIZE)
768
                buf = mempool_alloc(rpc_buffer_mempool, gfp);
769
        else
770
                buf = kmalloc(size, gfp);
771
 
772
        if (!buf)
773
                return NULL;
774
 
775
        buf->len = size;
776
        dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
777
                        task->tk_pid, size, buf);
778
        return &buf->data;
779
}
780
EXPORT_SYMBOL_GPL(rpc_malloc);
781
 
782
/**
783
 * rpc_free - free buffer allocated via rpc_malloc
784
 * @buffer: buffer to free
785
 *
786
 */
787
void rpc_free(void *buffer)
788
{
789
        size_t size;
790
        struct rpc_buffer *buf;
791
 
792
        if (!buffer)
793
                return;
794
 
795
        buf = container_of(buffer, struct rpc_buffer, data);
796
        size = buf->len;
797
 
798
        dprintk("RPC:       freeing buffer of size %zu at %p\n",
799
                        size, buf);
800
 
801
        if (size <= RPC_BUFFER_MAXSIZE)
802
                mempool_free(buf, rpc_buffer_mempool);
803
        else
804
                kfree(buf);
805
}
806
EXPORT_SYMBOL_GPL(rpc_free);
807
 
808
/*
809
 * Creation and deletion of RPC task structures
810
 */
811
void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
812
{
813
        memset(task, 0, sizeof(*task));
814
        init_timer(&task->tk_timer);
815
        task->tk_timer.data     = (unsigned long) task;
816
        task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
817
        atomic_set(&task->tk_count, 1);
818
        task->tk_client = clnt;
819
        task->tk_flags  = flags;
820
        task->tk_ops = tk_ops;
821
        if (tk_ops->rpc_call_prepare != NULL)
822
                task->tk_action = rpc_prepare_task;
823
        task->tk_calldata = calldata;
824
        INIT_LIST_HEAD(&task->tk_task);
825
 
826
        /* Initialize retry counters */
827
        task->tk_garb_retry = 2;
828
        task->tk_cred_retry = 2;
829
 
830
        task->tk_priority = RPC_PRIORITY_NORMAL;
831
        task->tk_cookie = (unsigned long)current;
832
 
833
        /* Initialize workqueue for async tasks */
834
        task->tk_workqueue = rpciod_workqueue;
835
 
836
        if (clnt) {
837
                kref_get(&clnt->cl_kref);
838
                if (clnt->cl_softrtry)
839
                        task->tk_flags |= RPC_TASK_SOFT;
840
                if (!clnt->cl_intr)
841
                        task->tk_flags |= RPC_TASK_NOINTR;
842
        }
843
 
844
        BUG_ON(task->tk_ops == NULL);
845
 
846
        /* starting timestamp */
847
        task->tk_start = jiffies;
848
 
849
        dprintk("RPC:       new task initialized, procpid %u\n",
850
                                task_pid_nr(current));
851
}
852
 
853
static struct rpc_task *
854
rpc_alloc_task(void)
855
{
856
        return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
857
}
858
 
859
static void rpc_free_task(struct rcu_head *rcu)
860
{
861
        struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
862
        dprintk("RPC: %5u freeing task\n", task->tk_pid);
863
        mempool_free(task, rpc_task_mempool);
864
}
865
 
866
/*
867
 * Create a new task for the specified client.
868
 */
869
struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
870
{
871
        struct rpc_task *task;
872
 
873
        task = rpc_alloc_task();
874
        if (!task)
875
                goto out;
876
 
877
        rpc_init_task(task, clnt, flags, tk_ops, calldata);
878
 
879
        dprintk("RPC:       allocated task %p\n", task);
880
        task->tk_flags |= RPC_TASK_DYNAMIC;
881
out:
882
        return task;
883
}
884
 
885
 
886
void rpc_put_task(struct rpc_task *task)
887
{
888
        const struct rpc_call_ops *tk_ops = task->tk_ops;
889
        void *calldata = task->tk_calldata;
890
 
891
        if (!atomic_dec_and_test(&task->tk_count))
892
                return;
893
        /* Release resources */
894
        if (task->tk_rqstp)
895
                xprt_release(task);
896
        if (task->tk_msg.rpc_cred)
897
                rpcauth_unbindcred(task);
898
        if (task->tk_client) {
899
                rpc_release_client(task->tk_client);
900
                task->tk_client = NULL;
901
        }
902
        if (task->tk_flags & RPC_TASK_DYNAMIC)
903
                call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
904
        rpc_release_calldata(tk_ops, calldata);
905
}
906
EXPORT_SYMBOL(rpc_put_task);
907
 
908
static void rpc_release_task(struct rpc_task *task)
909
{
910
#ifdef RPC_DEBUG
911
        BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
912
#endif
913
        dprintk("RPC: %5u release task\n", task->tk_pid);
914
 
915
        if (!list_empty(&task->tk_task)) {
916
                struct rpc_clnt *clnt = task->tk_client;
917
                /* Remove from client task list */
918
                spin_lock(&clnt->cl_lock);
919
                list_del(&task->tk_task);
920
                spin_unlock(&clnt->cl_lock);
921
        }
922
        BUG_ON (RPC_IS_QUEUED(task));
923
 
924
        /* Synchronously delete any running timer */
925
        rpc_delete_timer(task);
926
 
927
#ifdef RPC_DEBUG
928
        task->tk_magic = 0;
929
#endif
930
        /* Wake up anyone who is waiting for task completion */
931
        rpc_mark_complete_task(task);
932
 
933
        rpc_put_task(task);
934
}
935
 
936
/*
937
 * Kill all tasks for the given client.
938
 * XXX: kill their descendants as well?
939
 */
940
void rpc_killall_tasks(struct rpc_clnt *clnt)
941
{
942
        struct rpc_task *rovr;
943
 
944
 
945
        if (list_empty(&clnt->cl_tasks))
946
                return;
947
        dprintk("RPC:       killing all tasks for client %p\n", clnt);
948
        /*
949
         * Spin lock all_tasks to prevent changes...
950
         */
951
        spin_lock(&clnt->cl_lock);
952
        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
953
                if (! RPC_IS_ACTIVATED(rovr))
954
                        continue;
955
                if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
956
                        rovr->tk_flags |= RPC_TASK_KILLED;
957
                        rpc_exit(rovr, -EIO);
958
                        rpc_wake_up_task(rovr);
959
                }
960
        }
961
        spin_unlock(&clnt->cl_lock);
962
}
963
 
964
int rpciod_up(void)
965
{
966
        return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
967
}
968
 
969
void rpciod_down(void)
970
{
971
        module_put(THIS_MODULE);
972
}
973
 
974
/*
975
 * Start up the rpciod workqueue.
976
 */
977
static int rpciod_start(void)
978
{
979
        struct workqueue_struct *wq;
980
 
981
        /*
982
         * Create the rpciod thread and wait for it to start.
983
         */
984
        dprintk("RPC:       creating workqueue rpciod\n");
985
        wq = create_workqueue("rpciod");
986
        rpciod_workqueue = wq;
987
        return rpciod_workqueue != NULL;
988
}
989
 
990
static void rpciod_stop(void)
991
{
992
        struct workqueue_struct *wq = NULL;
993
 
994
        if (rpciod_workqueue == NULL)
995
                return;
996
        dprintk("RPC:       destroying workqueue rpciod\n");
997
 
998
        wq = rpciod_workqueue;
999
        rpciod_workqueue = NULL;
1000
        destroy_workqueue(wq);
1001
}
1002
 
1003
void
1004
rpc_destroy_mempool(void)
1005
{
1006
        rpciod_stop();
1007
        if (rpc_buffer_mempool)
1008
                mempool_destroy(rpc_buffer_mempool);
1009
        if (rpc_task_mempool)
1010
                mempool_destroy(rpc_task_mempool);
1011
        if (rpc_task_slabp)
1012
                kmem_cache_destroy(rpc_task_slabp);
1013
        if (rpc_buffer_slabp)
1014
                kmem_cache_destroy(rpc_buffer_slabp);
1015
}
1016
 
1017
int
1018
rpc_init_mempool(void)
1019
{
1020
        rpc_task_slabp = kmem_cache_create("rpc_tasks",
1021
                                             sizeof(struct rpc_task),
1022
                                             0, SLAB_HWCACHE_ALIGN,
1023
                                             NULL);
1024
        if (!rpc_task_slabp)
1025
                goto err_nomem;
1026
        rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1027
                                             RPC_BUFFER_MAXSIZE,
1028
                                             0, SLAB_HWCACHE_ALIGN,
1029
                                             NULL);
1030
        if (!rpc_buffer_slabp)
1031
                goto err_nomem;
1032
        rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1033
                                                    rpc_task_slabp);
1034
        if (!rpc_task_mempool)
1035
                goto err_nomem;
1036
        rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1037
                                                      rpc_buffer_slabp);
1038
        if (!rpc_buffer_mempool)
1039
                goto err_nomem;
1040
        if (!rpciod_start())
1041
                goto err_nomem;
1042
        return 0;
1043
err_nomem:
1044
        rpc_destroy_mempool();
1045
        return -ENOMEM;
1046
}

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.