OpenCores
URL https://opencores.org/ocsvn/or1k/or1k/trunk

Subversion Repositories or1k

[/] [or1k/] [trunk/] [linux/] [linux-2.4/] [net/] [sunrpc/] [xprt.c] - Blame information for rev 1765

Details | Compare with Previous | View Log

Line No. Rev Author Line
1 1275 phoenix
/*
2
 *  linux/net/sunrpc/xprt.c
3
 *
4
 *  This is a generic RPC call interface supporting congestion avoidance,
5
 *  and asynchronous calls.
6
 *
7
 *  The interface works like this:
8
 *
9
 *  -   When a process places a call, it allocates a request slot if
10
 *      one is available. Otherwise, it sleeps on the backlog queue
11
 *      (xprt_reserve).
12
 *  -   Next, the caller puts together the RPC message, stuffs it into
13
 *      the request struct, and calls xprt_call().
14
 *  -   xprt_call transmits the message and installs the caller on the
15
 *      socket's wait list. At the same time, it installs a timer that
16
 *      is run after the packet's timeout has expired.
17
 *  -   When a packet arrives, the data_ready handler walks the list of
18
 *      pending requests for that socket. If a matching XID is found, the
19
 *      caller is woken up, and the timer removed.
20
 *  -   When no reply arrives within the timeout interval, the timer is
21
 *      fired by the kernel and runs xprt_timer(). It either adjusts the
22
 *      timeout values (minor timeout) or wakes up the caller with a status
23
 *      of -ETIMEDOUT.
24
 *  -   When the caller receives a notification from RPC that a reply arrived,
25
 *      it should release the RPC slot, and process the reply.
26
 *      If the call timed out, it may choose to retry the operation by
27
 *      adjusting the initial timeout value, and simply calling rpc_call
28
 *      again.
29
 *
30
 *  Support for async RPC is done through a set of RPC-specific scheduling
31
 *  primitives that `transparently' work for processes as well as async
32
 *  tasks that rely on callbacks.
33
 *
34
 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35
 *
36
 *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37
 *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38
 *  TCP NFS related read + write fixes
39
 *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
40
 *
41
 *  Rewrite of larges part of the code in order to stabilize TCP stuff.
42
 *  Fix behaviour when socket buffer is full.
43
 *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
44
 */
45
 
46
#define __KERNEL_SYSCALLS__
47
 
48
#include <linux/version.h>
49
#include <linux/types.h>
50
#include <linux/slab.h>
51
#include <linux/capability.h>
52
#include <linux/sched.h>
53
#include <linux/errno.h>
54
#include <linux/socket.h>
55
#include <linux/in.h>
56
#include <linux/net.h>
57
#include <linux/mm.h>
58
#include <linux/udp.h>
59
#include <linux/unistd.h>
60
#include <linux/sunrpc/clnt.h>
61
#include <linux/file.h>
62
 
63
#include <net/sock.h>
64
#include <net/checksum.h>
65
#include <net/udp.h>
66
#include <net/tcp.h>
67
 
68
#include <asm/uaccess.h>
69
 
70
/*
71
 * Local variables
72
 */
73
 
74
#ifdef RPC_DEBUG
75
# undef  RPC_DEBUG_DATA
76
# define RPCDBG_FACILITY        RPCDBG_XPRT
77
#endif
78
 
79
#define XPRT_MAX_BACKOFF        (8)
80
 
81
/*
82
 * Local functions
83
 */
84
static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
85
static void     do_xprt_transmit(struct rpc_task *);
86
static inline void      do_xprt_reserve(struct rpc_task *);
87
static void     xprt_disconnect(struct rpc_xprt *);
88
static void     xprt_connect_status(struct rpc_task *task);
89
static struct socket *xprt_create_socket(int, struct rpc_timeout *, int);
90
static int      xprt_bind_socket(struct rpc_xprt *, struct socket *);
91
static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
92
 
93
#ifdef RPC_DEBUG_DATA
94
/*
95
 * Print the buffer contents (first 128 bytes only--just enough for
96
 * diropres return).
97
 */
98
static void
99
xprt_pktdump(char *msg, u32 *packet, unsigned int count)
100
{
101
        u8      *buf = (u8 *) packet;
102
        int     j;
103
 
104
        dprintk("RPC:      %s\n", msg);
105
        for (j = 0; j < count && j < 128; j += 4) {
106
                if (!(j & 31)) {
107
                        if (j)
108
                                dprintk("\n");
109
                        dprintk("0x%04x ", j);
110
                }
111
                dprintk("%02x%02x%02x%02x ",
112
                        buf[j], buf[j+1], buf[j+2], buf[j+3]);
113
        }
114
        dprintk("\n");
115
}
116
#else
117
static inline void
118
xprt_pktdump(char *msg, u32 *packet, unsigned int count)
119
{
120
        /* NOP */
121
}
122
#endif
123
 
124
/*
125
 * Look up RPC transport given an INET socket
126
 */
127
static inline struct rpc_xprt *
128
xprt_from_sock(struct sock *sk)
129
{
130
        return (struct rpc_xprt *) sk->user_data;
131
}
132
 
133
/*
134
 * Serialize write access to sockets, in order to prevent different
135
 * requests from interfering with each other.
136
 * Also prevents TCP socket connections from colliding with writes.
137
 */
138
static int
139
__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
140
{
141
        struct rpc_rqst *req = task->tk_rqstp;
142
        if (!xprt->snd_task) {
143
                if (xprt->nocong || __xprt_get_cong(xprt, task)) {
144
                        xprt->snd_task = task;
145
                        if (req) {
146
                                req->rq_bytes_sent = 0;
147
                                req->rq_ntrans++;
148
                        }
149
                }
150
        }
151
        if (xprt->snd_task != task) {
152
                dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
153
                task->tk_timeout = 0;
154
                task->tk_status = -EAGAIN;
155
                if (req && req->rq_ntrans)
156
                        rpc_sleep_on(&xprt->resend, task, NULL, NULL);
157
                else
158
                        rpc_sleep_on(&xprt->sending, task, NULL, NULL);
159
        }
160
        return xprt->snd_task == task;
161
}
162
 
163
static inline int
164
xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
165
{
166
        int retval;
167
        spin_lock_bh(&xprt->sock_lock);
168
        retval = __xprt_lock_write(xprt, task);
169
        spin_unlock_bh(&xprt->sock_lock);
170
        return retval;
171
}
172
 
173
static void
174
__xprt_lock_write_next(struct rpc_xprt *xprt)
175
{
176
        struct rpc_task *task;
177
 
178
        if (xprt->snd_task)
179
                return;
180
        task = rpc_wake_up_next(&xprt->resend);
181
        if (!task) {
182
                if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
183
                        return;
184
                task = rpc_wake_up_next(&xprt->sending);
185
                if (!task)
186
                        return;
187
        }
188
        if (xprt->nocong || __xprt_get_cong(xprt, task)) {
189
                struct rpc_rqst *req = task->tk_rqstp;
190
                xprt->snd_task = task;
191
                if (req) {
192
                        req->rq_bytes_sent = 0;
193
                        req->rq_ntrans++;
194
                }
195
        }
196
}
197
 
198
/*
199
 * Releases the socket for use by other requests.
200
 */
201
static void
202
__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
203
{
204
        if (xprt->snd_task == task)
205
                xprt->snd_task = NULL;
206
        __xprt_lock_write_next(xprt);
207
}
208
 
209
static inline void
210
xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
211
{
212
        spin_lock_bh(&xprt->sock_lock);
213
        __xprt_release_write(xprt, task);
214
        spin_unlock_bh(&xprt->sock_lock);
215
}
216
 
217
/*
218
 * Write data to socket.
219
 */
220
static inline int
221
xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
222
{
223
        struct socket   *sock = xprt->sock;
224
        struct msghdr   msg;
225
        struct xdr_buf  *xdr = &req->rq_snd_buf;
226
        struct iovec    niv[MAX_IOVEC];
227
        unsigned int    niov, slen, skip;
228
        mm_segment_t    oldfs;
229
        int             result;
230
 
231
        if (!sock)
232
                return -ENOTCONN;
233
 
234
        xprt_pktdump("packet data:",
235
                                req->rq_svec->iov_base,
236
                                req->rq_svec->iov_len);
237
 
238
        /* Dont repeat bytes */
239
        skip = req->rq_bytes_sent;
240
        slen = xdr->len - skip;
241
        oldfs = get_fs(); set_fs(get_ds());
242
        do {
243
                unsigned int slen_part, n;
244
 
245
                niov = xdr_kmap(niv, xdr, skip);
246
                if (!niov) {
247
                        result = -EAGAIN;
248
                        break;
249
                }
250
 
251
                msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
252
                msg.msg_iov     = niv;
253
                msg.msg_iovlen  = niov;
254
                msg.msg_name    = (struct sockaddr *) &xprt->addr;
255
                msg.msg_namelen = sizeof(xprt->addr);
256
                msg.msg_control = NULL;
257
                msg.msg_controllen = 0;
258
 
259
                slen_part = 0;
260
                for (n = 0; n < niov; n++)
261
                        slen_part += niv[n].iov_len;
262
 
263
                clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
264
                result = sock_sendmsg(sock, &msg, slen_part);
265
 
266
                xdr_kunmap(xdr, skip, niov);
267
 
268
                skip += slen_part;
269
                slen -= slen_part;
270
        } while (result >= 0 && slen);
271
        set_fs(oldfs);
272
 
273
        dprintk("RPC:      xprt_sendmsg(%d) = %d\n", slen, result);
274
 
275
        if (result >= 0)
276
                return result;
277
 
278
        switch (result) {
279
        case -ECONNREFUSED:
280
                /* When the server has died, an ICMP port unreachable message
281
                 * prompts ECONNREFUSED.
282
                 */
283
        case -EAGAIN:
284
                break;
285
        case -ECONNRESET:
286
        case -ENOTCONN:
287
        case -EPIPE:
288
                /* connection broken */
289
                if (xprt->stream)
290
                        result = -ENOTCONN;
291
                break;
292
        default:
293
                printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
294
        }
295
        return result;
296
}
297
 
298
/*
299
 * Van Jacobson congestion avoidance. Check if the congestion window
300
 * overflowed. Put the task to sleep if this is the case.
301
 */
302
static int
303
__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
304
{
305
        struct rpc_rqst *req = task->tk_rqstp;
306
 
307
        if (req->rq_cong)
308
                return 1;
309
        dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
310
                        task->tk_pid, xprt->cong, xprt->cwnd);
311
        if (RPCXPRT_CONGESTED(xprt))
312
                return 0;
313
        req->rq_cong = 1;
314
        xprt->cong += RPC_CWNDSCALE;
315
        return 1;
316
}
317
 
318
/*
319
 * Adjust the congestion window, and wake up the next task
320
 * that has been sleeping due to congestion
321
 */
322
static void
323
__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
324
{
325
        if (!req->rq_cong)
326
                return;
327
        req->rq_cong = 0;
328
        xprt->cong -= RPC_CWNDSCALE;
329
        __xprt_lock_write_next(xprt);
330
}
331
 
332
/*
333
 * Adjust RPC congestion window
334
 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
335
 */
336
static void
337
xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
338
{
339
        unsigned long   cwnd;
340
 
341
        cwnd = xprt->cwnd;
342
        if (result >= 0 && cwnd <= xprt->cong) {
343
                /* The (cwnd >> 1) term makes sure
344
                 * the result gets rounded properly. */
345
                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
346
                if (cwnd > RPC_MAXCWND)
347
                        cwnd = RPC_MAXCWND;
348
                __xprt_lock_write_next(xprt);
349
        } else if (result == -ETIMEDOUT) {
350
                cwnd >>= 1;
351
                if (cwnd < RPC_CWNDSCALE)
352
                        cwnd = RPC_CWNDSCALE;
353
        }
354
        dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
355
                        xprt->cong, xprt->cwnd, cwnd);
356
        xprt->cwnd = cwnd;
357
}
358
 
359
/*
360
 * Adjust timeout values etc for next retransmit
361
 */
362
int
363
xprt_adjust_timeout(struct rpc_timeout *to)
364
{
365
        if (to->to_retries > 0) {
366
                if (to->to_exponential)
367
                        to->to_current <<= 1;
368
                else
369
                        to->to_current += to->to_increment;
370
                if (to->to_maxval && to->to_current >= to->to_maxval)
371
                        to->to_current = to->to_maxval;
372
        } else {
373
                if (to->to_exponential)
374
                        to->to_initval <<= 1;
375
                else
376
                        to->to_initval += to->to_increment;
377
                if (to->to_maxval && to->to_initval >= to->to_maxval)
378
                        to->to_initval = to->to_maxval;
379
                to->to_current = to->to_initval;
380
        }
381
 
382
        if (!to->to_current) {
383
                printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
384
                to->to_current = 5 * HZ;
385
        }
386
        pprintk("RPC: %lu %s\n", jiffies,
387
                        to->to_retries? "retrans" : "timeout");
388
        return to->to_retries-- > 0;
389
}
390
 
391
/*
392
 * Close down a transport socket
393
 */
394
static void
395
xprt_close(struct rpc_xprt *xprt)
396
{
397
        struct socket   *sock = xprt->sock;
398
        struct sock     *sk = xprt->inet;
399
 
400
        if (!sk)
401
                return;
402
 
403
        write_lock_bh(&sk->callback_lock);
404
        xprt->inet = NULL;
405
        xprt->sock = NULL;
406
 
407
        sk->user_data    = NULL;
408
        sk->data_ready   = xprt->old_data_ready;
409
        sk->state_change = xprt->old_state_change;
410
        sk->write_space  = xprt->old_write_space;
411
        write_unlock_bh(&sk->callback_lock);
412
 
413
        xprt_disconnect(xprt);
414
        sk->no_check     = 0;
415
 
416
        sock_release(sock);
417
}
418
 
419
/*
420
 * Mark a transport as disconnected
421
 */
422
static void
423
xprt_disconnect(struct rpc_xprt *xprt)
424
{
425
        dprintk("RPC:      disconnected transport %p\n", xprt);
426
        spin_lock_bh(&xprt->sock_lock);
427
        xprt_clear_connected(xprt);
428
        rpc_wake_up_status(&xprt->pending, -ENOTCONN);
429
        spin_unlock_bh(&xprt->sock_lock);
430
}
431
 
432
/*
433
 * Reconnect a broken TCP connection.
434
 *
435
 */
436
void
437
xprt_connect(struct rpc_task *task)
438
{
439
        struct rpc_xprt *xprt = task->tk_xprt;
440
        struct socket   *sock = xprt->sock;
441
        struct sock     *inet;
442
        int             status;
443
 
444
        dprintk("RPC: %4d xprt_connect %p connected %d\n",
445
                                task->tk_pid, xprt, xprt_connected(xprt));
446
        if (xprt->shutdown)
447
                return;
448
 
449
        if (!xprt->addr.sin_port) {
450
                task->tk_status = -EIO;
451
                return;
452
        }
453
 
454
        if (!xprt_lock_write(xprt, task))
455
                return;
456
        if (xprt_connected(xprt))
457
                goto out_write;
458
 
459
        if (task->tk_rqstp)
460
                task->tk_rqstp->rq_bytes_sent = 0;
461
 
462
        xprt_close(xprt);
463
        /* Create an unconnected socket */
464
        sock = xprt_create_socket(xprt->prot, &xprt->timeout, xprt->resvport);
465
        if (!sock) {
466
                /* couldn't create socket or bind to reserved port;
467
                 * this is likely a permanent error, so cause an abort */
468
                task->tk_status = -EIO;
469
                goto out_write;
470
        }
471
        xprt_bind_socket(xprt, sock);
472
 
473
        if (!xprt->stream)
474
                goto out_write;
475
 
476
        inet = sock->sk;
477
 
478
        /* Now connect it asynchronously. */
479
        dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
480
        status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
481
                                sizeof(xprt->addr), O_NONBLOCK);
482
        dprintk("RPC: %4d connect status %d connected %d\n",
483
                task->tk_pid, status, xprt_connected(xprt));
484
 
485
        if (status >= 0)
486
                return;
487
 
488
        switch (status) {
489
        case -EALREADY:
490
        case -EINPROGRESS:
491
                /* Protect against TCP socket state changes */
492
                lock_sock(inet);
493
                if (inet->state != TCP_ESTABLISHED) {
494
                        dprintk("RPC: %4d  waiting for connection\n",
495
                                        task->tk_pid);
496
                        task->tk_timeout = RPC_CONNECT_TIMEOUT;
497
                        /* if the socket is already closing, delay briefly */
498
                        if ((1<<inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
499
                                task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
500
                        rpc_sleep_on(&xprt->pending, task, xprt_connect_status,
501
                                        NULL);
502
                }
503
                release_sock(inet);
504
                break;
505
        case -ECONNREFUSED:
506
        case -ECONNRESET:
507
        case -ENOTCONN:
508
                if (!task->tk_client->cl_softrtry) {
509
                        rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
510
                        task->tk_status = -ENOTCONN;
511
                        break;
512
                }
513
        default:
514
                /* Report myriad other possible returns.  If this file
515
                 * system is soft mounted, just error out, like Solaris.  */
516
                if (task->tk_client->cl_softrtry) {
517
                        printk(KERN_WARNING
518
                                        "RPC: error %d connecting to server %s, exiting\n",
519
                                        -status, task->tk_client->cl_server);
520
                        task->tk_status = -EIO;
521
                        goto out_write;
522
                }
523
                printk(KERN_WARNING "RPC: error %d connecting to server %s\n",
524
                                -status, task->tk_client->cl_server);
525
                /* This will prevent anybody else from connecting */
526
                rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
527
                task->tk_status = status;
528
                break;
529
        }
530
        return;
531
 out_write:
532
        xprt_release_write(xprt, task);
533
}
534
 
535
/*
536
 * We arrive here when awoken from waiting on connection establishment.
537
 */
538
static void
539
xprt_connect_status(struct rpc_task *task)
540
{
541
        struct rpc_xprt *xprt = task->tk_xprt;
542
 
543
        if (task->tk_status >= 0) {
544
                dprintk("RPC: %4d xprt_connect_status: connection established\n",
545
                                task->tk_pid);
546
                return;
547
        }
548
 
549
        /* if soft mounted, cause this RPC to fail */
550
        if (task->tk_client->cl_softrtry)
551
                task->tk_status = -EIO;
552
 
553
        switch (task->tk_status) {
554
        case -ENOTCONN:
555
                rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
556
                return;
557
        case -ETIMEDOUT:
558
                dprintk("RPC: %4d xprt_connect_status: timed out\n",
559
                                task->tk_pid);
560
                break;
561
        default:
562
                printk(KERN_ERR "RPC: error %d connecting to server %s\n",
563
                                -task->tk_status, task->tk_client->cl_server);
564
        }
565
        xprt_release_write(xprt, task);
566
}
567
 
568
/*
569
 * Look up the RPC request corresponding to a reply, and then lock it.
570
 */
571
static inline struct rpc_rqst *
572
xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
573
{
574
        struct list_head *pos;
575
        struct rpc_rqst *req = NULL;
576
 
577
        list_for_each(pos, &xprt->recv) {
578
                struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
579
                if (entry->rq_xid == xid) {
580
                        req = entry;
581
                        break;
582
                }
583
        }
584
        return req;
585
}
586
 
587
/*
588
 * Complete reply received.
589
 * The TCP code relies on us to remove the request from xprt->pending.
590
 */
591
static void
592
xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
593
{
594
        struct rpc_task *task = req->rq_task;
595
        struct rpc_clnt *clnt = task->tk_client;
596
 
597
        /* Adjust congestion window */
598
        if (!xprt->nocong) {
599
                int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
600
                xprt_adjust_cwnd(xprt, copied);
601
                __xprt_put_cong(xprt, req);
602
                if (req->rq_ntrans == 1) {
603
                        if (timer)
604
                                rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
605
                }
606
                rpc_set_timeo(&clnt->cl_rtt, timer, req->rq_ntrans - 1);
607
        }
608
 
609
#ifdef RPC_PROFILE
610
        /* Profile only reads for now */
611
        if (copied > 1024) {
612
                static unsigned long    nextstat = 0;
613
                static unsigned long    pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
614
 
615
                pkt_cnt++;
616
                pkt_len += req->rq_slen + copied;
617
                pkt_rtt += jiffies - req->rq_xtime;
618
                if (time_before(nextstat, jiffies)) {
619
                        printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
620
                        printk("RPC: %ld %ld %ld %ld stat\n",
621
                                        jiffies, pkt_cnt, pkt_len, pkt_rtt);
622
                        pkt_rtt = pkt_len = pkt_cnt = 0;
623
                        nextstat = jiffies + 5 * HZ;
624
                }
625
        }
626
#endif
627
 
628
        dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
629
        req->rq_received = copied;
630
        list_del_init(&req->rq_list);
631
 
632
        /* ... and wake up the process. */
633
        rpc_wake_up_task(task);
634
        return;
635
}
636
 
637
static size_t
638
skb_read_bits(skb_reader_t *desc, void *to, size_t len)
639
{
640
        if (len > desc->count)
641
                len = desc->count;
642
        skb_copy_bits(desc->skb, desc->offset, to, len);
643
        desc->count -= len;
644
        desc->offset += len;
645
        return len;
646
}
647
 
648
static size_t
649
skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
650
{
651
        unsigned int csum2, pos;
652
 
653
        if (len > desc->count)
654
                len = desc->count;
655
        pos = desc->offset;
656
        csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
657
        desc->csum = csum_block_add(desc->csum, csum2, pos);
658
        desc->count -= len;
659
        desc->offset += len;
660
        return len;
661
}
662
 
663
/*
664
 * We have set things up such that we perform the checksum of the UDP
665
 * packet in parallel with the copies into the RPC client iovec.  -DaveM
666
 */
667
static int
668
csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
669
{
670
        skb_reader_t desc;
671
 
672
        desc.skb = skb;
673
        desc.offset = sizeof(struct udphdr);
674
        desc.count = skb->len - desc.offset;
675
 
676
        if (skb->ip_summed == CHECKSUM_UNNECESSARY)
677
                goto no_checksum;
678
 
679
        desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
680
        xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
681
        if (desc.offset != skb->len) {
682
                unsigned int csum2;
683
                csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
684
                desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
685
        }
686
        if ((unsigned short)csum_fold(desc.csum))
687
                return -1;
688
        return 0;
689
no_checksum:
690
        xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
691
        return 0;
692
}
693
 
694
/*
695
 * Input handler for RPC replies. Called from a bottom half and hence
696
 * atomic.
697
 */
698
static void
699
udp_data_ready(struct sock *sk, int len)
700
{
701
        struct rpc_task *task;
702
        struct rpc_xprt *xprt;
703
        struct rpc_rqst *rovr;
704
        struct sk_buff  *skb;
705
        int             err, repsize, copied;
706
 
707
        read_lock(&sk->callback_lock);
708
        dprintk("RPC:      udp_data_ready...\n");
709
        if (sk->dead || !(xprt = xprt_from_sock(sk))) {
710
                printk("RPC:      udp_data_ready request not found!\n");
711
                goto out;
712
        }
713
 
714
        dprintk("RPC:      udp_data_ready client %p\n", xprt);
715
 
716
        if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
717
                goto out;
718
 
719
        if (xprt->shutdown)
720
                goto dropit;
721
 
722
        repsize = skb->len - sizeof(struct udphdr);
723
        if (repsize < 4) {
724
                printk("RPC: impossible RPC reply size %d!\n", repsize);
725
                goto dropit;
726
        }
727
 
728
        /* Look up and lock the request corresponding to the given XID */
729
        spin_lock(&xprt->sock_lock);
730
        rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
731
        if (!rovr)
732
                goto out_unlock;
733
        task = rovr->rq_task;
734
 
735
        dprintk("RPC: %4d received reply\n", task->tk_pid);
736
        xprt_pktdump("packet data:",
737
                     (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
738
 
739
        if ((copied = rovr->rq_private_buf.len) > repsize)
740
                copied = repsize;
741
 
742
        /* Suck it into the iovec, verify checksum if not done by hw. */
743
        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
744
                goto out_unlock;
745
 
746
        /* Something worked... */
747
        dst_confirm(skb->dst);
748
 
749
        xprt_complete_rqst(xprt, rovr, copied);
750
 
751
 out_unlock:
752
        spin_unlock(&xprt->sock_lock);
753
 dropit:
754
        skb_free_datagram(sk, skb);
755
 out:
756
        if (sk->sleep && waitqueue_active(sk->sleep))
757
                wake_up_interruptible(sk->sleep);
758
        read_unlock(&sk->callback_lock);
759
}
760
 
761
/*
762
 * Copy from an skb into memory and shrink the skb.
763
 */
764
static inline size_t
765
tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
766
{
767
        if (len > desc->count)
768
                len = desc->count;
769
        skb_copy_bits(desc->skb, desc->offset, p, len);
770
        desc->offset += len;
771
        desc->count -= len;
772
        return len;
773
}
774
 
775
/*
776
 * TCP read fragment marker
777
 */
778
static inline void
779
tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
780
{
781
        size_t len, used;
782
        char *p;
783
 
784
        p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
785
        len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
786
        used = tcp_copy_data(desc, p, len);
787
        xprt->tcp_offset += used;
788
        if (used != len)
789
                return;
790
        xprt->tcp_reclen = ntohl(xprt->tcp_recm);
791
        if (xprt->tcp_reclen & 0x80000000)
792
                xprt->tcp_flags |= XPRT_LAST_FRAG;
793
        else
794
                xprt->tcp_flags &= ~XPRT_LAST_FRAG;
795
        xprt->tcp_reclen &= 0x7fffffff;
796
        xprt->tcp_flags &= ~XPRT_COPY_RECM;
797
        xprt->tcp_offset = 0;
798
        /* Sanity check of the record length */
799
        if (xprt->tcp_reclen < 4) {
800
                printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
801
                xprt_disconnect(xprt);
802
        }
803
        dprintk("RPC:      reading TCP record fragment of length %d\n",
804
                        xprt->tcp_reclen);
805
}
806
 
807
static void
808
tcp_check_recm(struct rpc_xprt *xprt)
809
{
810
        if (xprt->tcp_offset == xprt->tcp_reclen) {
811
                xprt->tcp_flags |= XPRT_COPY_RECM;
812
                xprt->tcp_offset = 0;
813
                if (xprt->tcp_flags & XPRT_LAST_FRAG) {
814
                        xprt->tcp_flags &= ~XPRT_COPY_DATA;
815
                        xprt->tcp_flags |= XPRT_COPY_XID;
816
                        xprt->tcp_copied = 0;
817
                }
818
        }
819
}
820
 
821
/*
822
 * TCP read xid
823
 */
824
static inline void
825
tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
826
{
827
        size_t len, used;
828
        char *p;
829
 
830
        len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
831
        dprintk("RPC:      reading XID (%Zu bytes)\n", len);
832
        p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
833
        used = tcp_copy_data(desc, p, len);
834
        xprt->tcp_offset += used;
835
        if (used != len)
836
                return;
837
        xprt->tcp_flags &= ~XPRT_COPY_XID;
838
        xprt->tcp_flags |= XPRT_COPY_DATA;
839
        xprt->tcp_copied = 4;
840
        dprintk("RPC:      reading reply for XID %08x\n", xprt->tcp_xid);
841
        tcp_check_recm(xprt);
842
}
843
 
844
/*
845
 * TCP read and complete request
846
 */
847
static inline void
848
tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
849
{
850
        struct rpc_rqst *req;
851
        struct xdr_buf *rcvbuf;
852
        size_t len;
853
 
854
        /* Find and lock the request corresponding to this xid */
855
        spin_lock(&xprt->sock_lock);
856
        req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
857
        if (!req) {
858
                xprt->tcp_flags &= ~XPRT_COPY_DATA;
859
                dprintk("RPC:      XID %08x request not found!\n",
860
                                xprt->tcp_xid);
861
                spin_unlock(&xprt->sock_lock);
862
                return;
863
        }
864
 
865
        rcvbuf = &req->rq_private_buf;
866
        len = desc->count;
867
        if (len > xprt->tcp_reclen - xprt->tcp_offset) {
868
                skb_reader_t my_desc;
869
 
870
                len = xprt->tcp_reclen - xprt->tcp_offset;
871
                memcpy(&my_desc, desc, sizeof(my_desc));
872
                my_desc.count = len;
873
                xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
874
                                          &my_desc, tcp_copy_data);
875
                desc->count -= len;
876
                desc->offset += len;
877
        } else
878
                xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
879
                                          desc, tcp_copy_data);
880
        xprt->tcp_copied += len;
881
        xprt->tcp_offset += len;
882
 
883
        if (xprt->tcp_copied == req->rq_private_buf.len)
884
                xprt->tcp_flags &= ~XPRT_COPY_DATA;
885
        else if (xprt->tcp_offset == xprt->tcp_reclen) {
886
                if (xprt->tcp_flags & XPRT_LAST_FRAG)
887
                        xprt->tcp_flags &= ~XPRT_COPY_DATA;
888
        }
889
 
890
        if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
891
                dprintk("RPC: %4d received reply complete\n",
892
                                req->rq_task->tk_pid);
893
                xprt_complete_rqst(xprt, req, xprt->tcp_copied);
894
        }
895
        spin_unlock(&xprt->sock_lock);
896
        tcp_check_recm(xprt);
897
}
898
 
899
/*
900
 * TCP discard extra bytes from a short read
901
 */
902
static inline void
903
tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
904
{
905
        size_t len;
906
 
907
        len = xprt->tcp_reclen - xprt->tcp_offset;
908
        if (len > desc->count)
909
                len = desc->count;
910
        desc->count -= len;
911
        desc->offset += len;
912
        xprt->tcp_offset += len;
913
        tcp_check_recm(xprt);
914
}
915
 
916
/*
917
 * TCP record receive routine
918
 * We first have to grab the record marker, then the XID, then the data.
919
 */
920
static int
921
tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
922
                unsigned int offset, size_t len)
923
{
924
        struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
925
        skb_reader_t desc = { skb, offset, len };
926
 
927
        dprintk("RPC:      tcp_data_recv\n");
928
        do {
929
                /* Read in a new fragment marker if necessary */
930
                /* Can we ever really expect to get completely empty fragments? */
931
                if (xprt->tcp_flags & XPRT_COPY_RECM) {
932
                        tcp_read_fraghdr(xprt, &desc);
933
                        continue;
934
                }
935
                /* Read in the xid if necessary */
936
                if (xprt->tcp_flags & XPRT_COPY_XID) {
937
                        tcp_read_xid(xprt, &desc);
938
                        continue;
939
                }
940
                /* Read in the request data */
941
                if (xprt->tcp_flags & XPRT_COPY_DATA) {
942
                        tcp_read_request(xprt, &desc);
943
                        continue;
944
                }
945
                /* Skip over any trailing bytes on short reads */
946
                tcp_read_discard(xprt, &desc);
947
        } while (desc.count);
948
        dprintk("RPC:      tcp_data_recv done\n");
949
        return len - desc.count;
950
}
951
 
952
static void tcp_data_ready(struct sock *sk, int bytes)
953
{
954
        struct rpc_xprt *xprt;
955
        read_descriptor_t rd_desc;
956
 
957
        read_lock(&sk->callback_lock);
958
        dprintk("RPC:      tcp_data_ready...\n");
959
        if (!(xprt = xprt_from_sock(sk))) {
960
                printk("RPC:      tcp_data_ready socket info not found!\n");
961
                goto out;
962
        }
963
        if (xprt->shutdown)
964
                goto out;
965
 
966
        /* We use rd_desc to pass struct xprt to tcp_data_recv */
967
        rd_desc.buf = (char *)xprt;
968
        rd_desc.count = 65536;
969
        tcp_read_sock(sk, &rd_desc, tcp_data_recv);
970
out:
971
        read_unlock(&sk->callback_lock);
972
}
973
 
974
static void
975
tcp_state_change(struct sock *sk)
976
{
977
        struct rpc_xprt *xprt;
978
 
979
        read_lock(&sk->callback_lock);
980
        if (!(xprt = xprt_from_sock(sk)))
981
                goto out;
982
        dprintk("RPC:      tcp_state_change client %p...\n", xprt);
983
        dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
984
                                sk->state, xprt_connected(xprt),
985
                                sk->dead, sk->zapped);
986
 
987
        switch (sk->state) {
988
        case TCP_ESTABLISHED:
989
                if (xprt_test_and_set_connected(xprt))
990
                        break;
991
 
992
                /* Reset TCP record info */
993
                xprt->tcp_offset = 0;
994
                xprt->tcp_reclen = 0;
995
                xprt->tcp_copied = 0;
996
                xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
997
 
998
                spin_lock_bh(&xprt->sock_lock);
999
                if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1000
                        rpc_wake_up_task(xprt->snd_task);
1001
                spin_unlock_bh(&xprt->sock_lock);
1002
                break;
1003
        case TCP_SYN_SENT:
1004
        case TCP_SYN_RECV:
1005
                break;
1006
        default:
1007
                xprt_disconnect(xprt);
1008
                break;
1009
        }
1010
 out:
1011
        if (sk->sleep && waitqueue_active(sk->sleep))
1012
                wake_up_interruptible_all(sk->sleep);
1013
        read_unlock(&sk->callback_lock);
1014
}
1015
 
1016
/*
1017
 * Called when more output buffer space is available for this socket.
1018
 * We try not to wake our writers until they can make "significant"
1019
 * progress, otherwise we'll waste resources thrashing sock_sendmsg
1020
 * with a bunch of small requests.
1021
 */
1022
static void
1023
xprt_write_space(struct sock *sk)
1024
{
1025
        struct rpc_xprt *xprt;
1026
        struct socket   *sock;
1027
 
1028
        read_lock(&sk->callback_lock);
1029
        if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
1030
                goto out;
1031
        if (xprt->shutdown)
1032
                goto out;
1033
 
1034
        /* Wait until we have enough socket memory */
1035
        if (xprt->stream) {
1036
                /* from net/ipv4/tcp.c:tcp_write_space */
1037
                if (tcp_wspace(sk) < tcp_min_write_space(sk))
1038
                        goto out;
1039
        } else {
1040
                /* from net/core/sock.c:sock_def_write_space */
1041
                if (!sock_writeable(sk))
1042
                        goto out;
1043
        }
1044
 
1045
        if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
1046
                goto out;
1047
 
1048
        spin_lock_bh(&xprt->sock_lock);
1049
        if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1050
                rpc_wake_up_task(xprt->snd_task);
1051
        spin_unlock_bh(&xprt->sock_lock);
1052
        if (sk->sleep && waitqueue_active(sk->sleep))
1053
                wake_up_interruptible(sk->sleep);
1054
out:
1055
        read_unlock(&sk->callback_lock);
1056
}
1057
 
1058
/*
1059
 * RPC receive timeout handler.
1060
 */
1061
static void
1062
xprt_timer(struct rpc_task *task)
1063
{
1064
        struct rpc_rqst *req = task->tk_rqstp;
1065
        struct rpc_xprt *xprt = req->rq_xprt;
1066
 
1067
        spin_lock(&xprt->sock_lock);
1068
        if (req->rq_received)
1069
                goto out;
1070
 
1071
        xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
1072
        __xprt_put_cong(xprt, req);
1073
 
1074
        dprintk("RPC: %4d xprt_timer (%s request)\n",
1075
                task->tk_pid, req ? "pending" : "backlogged");
1076
 
1077
        task->tk_status  = -ETIMEDOUT;
1078
out:
1079
        task->tk_timeout = 0;
1080
        rpc_wake_up_task(task);
1081
        spin_unlock(&xprt->sock_lock);
1082
}
1083
 
1084
/*
1085
 * Place the actual RPC call.
1086
 * We have to copy the iovec because sendmsg fiddles with its contents.
1087
 */
1088
void
1089
xprt_transmit(struct rpc_task *task)
1090
{
1091
        struct rpc_rqst *req = task->tk_rqstp;
1092
        struct rpc_xprt *xprt = req->rq_xprt;
1093
 
1094
        dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
1095
                                *(u32 *)(req->rq_svec[0].iov_base));
1096
 
1097
        if (xprt->shutdown)
1098
                task->tk_status = -EIO;
1099
 
1100
        if (task->tk_status < 0)
1101
                return;
1102
 
1103
        if (task->tk_rpcwait)
1104
                rpc_remove_wait_queue(task);
1105
 
1106
        /* set up everything as needed. */
1107
        /* Write the record marker */
1108
        if (xprt->stream) {
1109
                u32     *marker = req->rq_svec[0].iov_base;
1110
 
1111
                *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1112
        }
1113
 
1114
        spin_lock_bh(&xprt->sock_lock);
1115
        if (req->rq_received != 0 && !req->rq_bytes_sent)
1116
                goto out_notrans;
1117
 
1118
        if (!__xprt_lock_write(xprt, task))
1119
                goto out_notrans;
1120
 
1121
        if (!xprt_connected(xprt)) {
1122
                task->tk_status = -ENOTCONN;
1123
                goto out_notrans;
1124
        }
1125
 
1126
        if (list_empty(&req->rq_list)) {
1127
                /* Update the softirq receive buffer */
1128
                memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1129
                                sizeof(req->rq_private_buf));
1130
                list_add_tail(&req->rq_list, &xprt->recv);
1131
        }
1132
        spin_unlock_bh(&xprt->sock_lock);
1133
 
1134
        do_xprt_transmit(task);
1135
        return;
1136
out_notrans:
1137
        spin_unlock_bh(&xprt->sock_lock);
1138
}
1139
 
1140
static void
1141
do_xprt_transmit(struct rpc_task *task)
1142
{
1143
        struct rpc_clnt *clnt = task->tk_client;
1144
        struct rpc_rqst *req = task->tk_rqstp;
1145
        struct rpc_xprt *xprt = req->rq_xprt;
1146
        int status, retry = 0;
1147
 
1148
 
1149
        /* Continue transmitting the packet/record. We must be careful
1150
         * to cope with writespace callbacks arriving _after_ we have
1151
         * called xprt_sendmsg().
1152
         */
1153
        while (1) {
1154
                req->rq_xtime = jiffies;
1155
                status = xprt_sendmsg(xprt, req);
1156
 
1157
                if (status < 0)
1158
                        break;
1159
 
1160
                if (xprt->stream) {
1161
                        req->rq_bytes_sent += status;
1162
 
1163
                        /* If we've sent the entire packet, immediately
1164
                         * reset the count of bytes sent. */
1165
                        if (req->rq_bytes_sent >= req->rq_slen) {
1166
                                req->rq_bytes_sent = 0;
1167
                                goto out_receive;
1168
                        }
1169
                } else {
1170
                        if (status >= req->rq_slen)
1171
                                goto out_receive;
1172
                        status = -EAGAIN;
1173
                        break;
1174
                }
1175
 
1176
                dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1177
                                task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1178
                                req->rq_slen);
1179
 
1180
                status = -EAGAIN;
1181
                if (retry++ > 50)
1182
                        break;
1183
        }
1184
 
1185
        /* If we're doing a resend and have received a reply already,
1186
         * then exit early.
1187
         * Note, though, that we can't do this if we've already started
1188
         * resending down a TCP stream.
1189
         */
1190
        task->tk_status = status;
1191
 
1192
        switch (status) {
1193
        case -EAGAIN:
1194
                if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
1195
                        /* Protect against races with xprt_write_space */
1196
                        spin_lock_bh(&xprt->sock_lock);
1197
                        /* Don't race with disconnect */
1198
                        if (!xprt_connected(xprt))
1199
                                task->tk_status = -ENOTCONN;
1200
                        else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
1201
                                task->tk_timeout = req->rq_timeout.to_current;
1202
                                rpc_sleep_on(&xprt->pending, task, NULL, NULL);
1203
                        }
1204
                        spin_unlock_bh(&xprt->sock_lock);
1205
                        return;
1206
                }
1207
                /* Keep holding the socket if it is blocked */
1208
                rpc_delay(task, HZ>>4);
1209
                return;
1210
        case -ECONNREFUSED:
1211
                task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
1212
                rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1213
        case -ENOTCONN:
1214
                return;
1215
        default:
1216
                if (xprt->stream)
1217
                        xprt_disconnect(xprt);
1218
        }
1219
        xprt_release_write(xprt, task);
1220
        return;
1221
 out_receive:
1222
        dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1223
        spin_lock_bh(&xprt->sock_lock);
1224
        /* Set the task's receive timeout value */
1225
        if (!xprt->nocong) {
1226
                int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
1227
                task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt, timer);
1228
                task->tk_timeout <<= rpc_ntimeo(&clnt->cl_rtt, timer);
1229
                task->tk_timeout <<= clnt->cl_timeout.to_retries
1230
                        - req->rq_timeout.to_retries;
1231
                if (task->tk_timeout > req->rq_timeout.to_maxval)
1232
                        task->tk_timeout = req->rq_timeout.to_maxval;
1233
        } else
1234
                task->tk_timeout = req->rq_timeout.to_current;
1235
        /* Don't race with disconnect */
1236
        if (!xprt_connected(xprt))
1237
                task->tk_status = -ENOTCONN;
1238
        else if (!req->rq_received)
1239
                rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
1240
        __xprt_release_write(xprt, task);
1241
        spin_unlock_bh(&xprt->sock_lock);
1242
}
1243
 
1244
/*
1245
 * Reserve an RPC call slot.
1246
 */
1247
void
1248
xprt_reserve(struct rpc_task *task)
1249
{
1250
        struct rpc_xprt *xprt = task->tk_xprt;
1251
 
1252
        task->tk_status = -EIO;
1253
        if (!xprt->shutdown) {
1254
                spin_lock(&xprt->xprt_lock);
1255
                do_xprt_reserve(task);
1256
                spin_unlock(&xprt->xprt_lock);
1257
        }
1258
}
1259
 
1260
static inline void
1261
do_xprt_reserve(struct rpc_task *task)
1262
{
1263
        struct rpc_xprt *xprt = task->tk_xprt;
1264
 
1265
        task->tk_status = 0;
1266
        if (task->tk_rqstp)
1267
                return;
1268
        if (xprt->free) {
1269
                struct rpc_rqst *req = xprt->free;
1270
                xprt->free = req->rq_next;
1271
                req->rq_next = NULL;
1272
                task->tk_rqstp = req;
1273
                xprt_request_init(task, xprt);
1274
                return;
1275
        }
1276
        dprintk("RPC:      waiting for request slot\n");
1277
        task->tk_status = -EAGAIN;
1278
        task->tk_timeout = 0;
1279
        rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1280
}
1281
 
1282
/*
1283
 * Allocate a 'unique' XID
1284
 */
1285
static u32
1286
xprt_alloc_xid(void)
1287
{
1288
        static spinlock_t xid_lock = SPIN_LOCK_UNLOCKED;
1289
        static int need_init = 1;
1290
        static u32 xid;
1291
        u32 ret;
1292
 
1293
        spin_lock(&xid_lock);
1294
        if (unlikely(need_init)) {
1295
                xid = CURRENT_TIME << 12;
1296
                need_init = 0;
1297
        }
1298
        ret = xid++;
1299
        spin_unlock(&xid_lock);
1300
        return ret;
1301
}
1302
 
1303
/*
1304
 * Initialize RPC request
1305
 */
1306
static void
1307
xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1308
{
1309
        struct rpc_rqst *req = task->tk_rqstp;
1310
 
1311
        req->rq_timeout = xprt->timeout;
1312
        req->rq_task    = task;
1313
        req->rq_xprt    = xprt;
1314
        req->rq_xid     = xprt_alloc_xid();
1315
        INIT_LIST_HEAD(&req->rq_list);
1316
        dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
1317
                        req, req->rq_xid);
1318
}
1319
 
1320
/*
1321
 * Release an RPC call slot
1322
 */
1323
void
1324
xprt_release(struct rpc_task *task)
1325
{
1326
        struct rpc_xprt *xprt = task->tk_xprt;
1327
        struct rpc_rqst *req;
1328
 
1329
        if (!(req = task->tk_rqstp))
1330
                return;
1331
        spin_lock_bh(&xprt->sock_lock);
1332
        __xprt_release_write(xprt, task);
1333
        __xprt_put_cong(xprt, req);
1334
        if (!list_empty(&req->rq_list))
1335
                list_del(&req->rq_list);
1336
        spin_unlock_bh(&xprt->sock_lock);
1337
        task->tk_rqstp = NULL;
1338
        memset(req, 0, sizeof(*req));    /* mark unused */
1339
 
1340
        dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1341
 
1342
        spin_lock(&xprt->xprt_lock);
1343
        req->rq_next = xprt->free;
1344
        xprt->free   = req;
1345
 
1346
        xprt_clear_backlog(xprt);
1347
        spin_unlock(&xprt->xprt_lock);
1348
}
1349
 
1350
/*
1351
 * Set default timeout parameters
1352
 */
1353
void
1354
xprt_default_timeout(struct rpc_timeout *to, int proto)
1355
{
1356
        if (proto == IPPROTO_UDP)
1357
                xprt_set_timeout(to, 5,  5 * HZ);
1358
        else
1359
                xprt_set_timeout(to, 5, 60 * HZ);
1360
}
1361
 
1362
/*
1363
 * Set constant timeout
1364
 */
1365
void
1366
xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1367
{
1368
        to->to_current   =
1369
        to->to_initval   =
1370
        to->to_increment = incr;
1371
        to->to_maxval    = incr * retr;
1372
        to->to_retries   = retr;
1373
        to->to_exponential = 0;
1374
}
1375
 
1376
/*
1377
 * Initialize an RPC client
1378
 */
1379
static struct rpc_xprt *
1380
xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
1381
{
1382
        struct rpc_xprt *xprt;
1383
        struct rpc_rqst *req;
1384
        int             i;
1385
 
1386
        dprintk("RPC:      setting up %s transport...\n",
1387
                                proto == IPPROTO_UDP? "UDP" : "TCP");
1388
 
1389
        if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1390
                return NULL;
1391
        memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1392
 
1393
        xprt->addr = *ap;
1394
        xprt->prot = proto;
1395
        xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1396
        if (xprt->stream) {
1397
                xprt->cwnd = RPC_MAXCWND;
1398
                xprt->nocong = 1;
1399
        } else
1400
                xprt->cwnd = RPC_INITCWND;
1401
        spin_lock_init(&xprt->sock_lock);
1402
        spin_lock_init(&xprt->xprt_lock);
1403
        init_waitqueue_head(&xprt->cong_wait);
1404
 
1405
        INIT_LIST_HEAD(&xprt->recv);
1406
 
1407
        /* Set timeout parameters */
1408
        if (to) {
1409
                xprt->timeout = *to;
1410
                xprt->timeout.to_current = to->to_initval;
1411
        } else
1412
                xprt_default_timeout(&xprt->timeout, xprt->prot);
1413
 
1414
        INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
1415
        INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
1416
        INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
1417
        INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
1418
 
1419
        /* initialize free list */
1420
        for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1421
                req->rq_next = req + 1;
1422
        req->rq_next = NULL;
1423
        xprt->free = xprt->slot;
1424
 
1425
        /* Check whether we want to use a reserved port */
1426
        xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1427
 
1428
        dprintk("RPC:      created transport %p\n", xprt);
1429
 
1430
        return xprt;
1431
}
1432
 
1433
/*
1434
 * Bind to a reserved port
1435
 */
1436
static inline int
1437
xprt_bindresvport(struct socket *sock)
1438
{
1439
        struct sockaddr_in myaddr;
1440
        int             err, port;
1441
        kernel_cap_t saved_cap = current->cap_effective;
1442
 
1443
        /* Override capabilities.
1444
         * They were checked in xprt_create_proto i.e. at mount time
1445
         */
1446
        cap_raise (current->cap_effective, CAP_NET_BIND_SERVICE);
1447
 
1448
        memset(&myaddr, 0, sizeof(myaddr));
1449
        myaddr.sin_family = AF_INET;
1450
        port = 800;
1451
        do {
1452
                myaddr.sin_port = htons(port);
1453
                err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1454
                                                sizeof(myaddr));
1455
        } while (err == -EADDRINUSE && --port > 0);
1456
        current->cap_effective = saved_cap;
1457
 
1458
        if (err < 0)
1459
                printk("RPC: Can't bind to reserved port (%d).\n", -err);
1460
 
1461
        return err;
1462
}
1463
 
1464
static int
1465
xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1466
{
1467
        struct sock     *sk = sock->sk;
1468
 
1469
        if (xprt->inet)
1470
                return -EBUSY;
1471
 
1472
        write_lock_bh(&sk->callback_lock);
1473
        sk->user_data = xprt;
1474
        xprt->old_data_ready = sk->data_ready;
1475
        xprt->old_state_change = sk->state_change;
1476
        xprt->old_write_space = sk->write_space;
1477
        if (xprt->prot == IPPROTO_UDP) {
1478
                sk->data_ready = udp_data_ready;
1479
                sk->no_check = UDP_CSUM_NORCV;
1480
                xprt_set_connected(xprt);
1481
        } else {
1482
                struct tcp_opt *tp = &(sk->tp_pinfo.af_tcp);
1483
                tp->nonagle = 1;        /* disable Nagle's algorithm */
1484
                sk->data_ready = tcp_data_ready;
1485
                sk->state_change = tcp_state_change;
1486
                xprt_clear_connected(xprt);
1487
        }
1488
        sk->write_space = xprt_write_space;
1489
 
1490
        /* Reset to new socket */
1491
        xprt->sock = sock;
1492
        xprt->inet = sk;
1493
        write_unlock_bh(&sk->callback_lock);
1494
 
1495
        return 0;
1496
}
1497
 
1498
/*
1499
 * Set socket buffer length
1500
 */
1501
void
1502
xprt_sock_setbufsize(struct rpc_xprt *xprt)
1503
{
1504
        struct sock *sk = xprt->inet;
1505
 
1506
        if (xprt->stream)
1507
                return;
1508
        if (xprt->rcvsize) {
1509
                sk->userlocks |= SOCK_RCVBUF_LOCK;
1510
                sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
1511
        }
1512
        if (xprt->sndsize) {
1513
                sk->userlocks |= SOCK_SNDBUF_LOCK;
1514
                sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
1515
                sk->write_space(sk);
1516
        }
1517
}
1518
 
1519
/*
1520
 * Create a client socket given the protocol and peer address.
1521
 */
1522
static struct socket *
1523
xprt_create_socket(int proto, struct rpc_timeout *to, int resvport)
1524
{
1525
        struct socket   *sock;
1526
        int             type, err;
1527
 
1528
        dprintk("RPC:      xprt_create_socket(%s %d)\n",
1529
                           (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1530
 
1531
        type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1532
 
1533
        if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1534
                printk("RPC: can't create socket (%d).\n", -err);
1535
                goto failed;
1536
        }
1537
 
1538
        /* bind to a reserved port */
1539
        if (resvport && xprt_bindresvport(sock) < 0)
1540
                goto failed;
1541
 
1542
        return sock;
1543
 
1544
failed:
1545
        sock_release(sock);
1546
        return NULL;
1547
}
1548
 
1549
/*
1550
 * Create an RPC client transport given the protocol and peer address.
1551
 */
1552
struct rpc_xprt *
1553
xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1554
{
1555
        struct rpc_xprt *xprt;
1556
 
1557
        xprt = xprt_setup(proto, sap, to);
1558
        if (!xprt)
1559
                goto out_bad;
1560
 
1561
        dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);
1562
        return xprt;
1563
out_bad:
1564
        dprintk("RPC:      xprt_create_proto failed\n");
1565
        if (xprt)
1566
                kfree(xprt);
1567
        return NULL;
1568
}
1569
 
1570
/*
1571
 * Prepare for transport shutdown.
1572
 */
1573
void
1574
xprt_shutdown(struct rpc_xprt *xprt)
1575
{
1576
        xprt->shutdown = 1;
1577
        rpc_wake_up(&xprt->sending);
1578
        rpc_wake_up(&xprt->resend);
1579
        rpc_wake_up(&xprt->pending);
1580
        rpc_wake_up(&xprt->backlog);
1581
        if (waitqueue_active(&xprt->cong_wait))
1582
                wake_up(&xprt->cong_wait);
1583
}
1584
 
1585
/*
1586
 * Clear the xprt backlog queue
1587
 */
1588
int
1589
xprt_clear_backlog(struct rpc_xprt *xprt) {
1590
        rpc_wake_up_next(&xprt->backlog);
1591
        if (waitqueue_active(&xprt->cong_wait))
1592
                wake_up(&xprt->cong_wait);
1593
        return 1;
1594
}
1595
 
1596
/*
1597
 * Destroy an RPC transport, killing off all requests.
1598
 */
1599
int
1600
xprt_destroy(struct rpc_xprt *xprt)
1601
{
1602
        dprintk("RPC:      destroying transport %p\n", xprt);
1603
        xprt_shutdown(xprt);
1604
        xprt_close(xprt);
1605
        kfree(xprt);
1606
 
1607
        return 0;
1608
}

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.