OpenCores
URL https://opencores.org/ocsvn/test_project/test_project/trunk

Subversion Repositories test_project

[/] [test_project/] [trunk/] [linux_sd_driver/] [fs/] [dlm/] [rcom.c] - Blame information for rev 62

Details | Compare with Previous | View Log

Line No. Rev Author Line
1 62 marcus.erl
/******************************************************************************
2
*******************************************************************************
3
**
4
**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5
**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
6
**
7
**  This copyrighted material is made available to anyone wishing to use,
8
**  modify, copy, or redistribute it subject to the terms and conditions
9
**  of the GNU General Public License v.2.
10
**
11
*******************************************************************************
12
******************************************************************************/
13
 
14
#include "dlm_internal.h"
15
#include "lockspace.h"
16
#include "member.h"
17
#include "lowcomms.h"
18
#include "midcomms.h"
19
#include "rcom.h"
20
#include "recover.h"
21
#include "dir.h"
22
#include "config.h"
23
#include "memory.h"
24
#include "lock.h"
25
#include "util.h"
26
 
27
 
28
static int rcom_response(struct dlm_ls *ls)
29
{
30
        return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
31
}
32
 
33
static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
34
                       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
35
{
36
        struct dlm_rcom *rc;
37
        struct dlm_mhandle *mh;
38
        char *mb;
39
        int mb_len = sizeof(struct dlm_rcom) + len;
40
 
41
        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
42
        if (!mh) {
43
                log_print("create_rcom to %d type %d len %d ENOBUFS",
44
                          to_nodeid, type, len);
45
                return -ENOBUFS;
46
        }
47
        memset(mb, 0, mb_len);
48
 
49
        rc = (struct dlm_rcom *) mb;
50
 
51
        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
52
        rc->rc_header.h_lockspace = ls->ls_global_id;
53
        rc->rc_header.h_nodeid = dlm_our_nodeid();
54
        rc->rc_header.h_length = mb_len;
55
        rc->rc_header.h_cmd = DLM_RCOM;
56
 
57
        rc->rc_type = type;
58
 
59
        spin_lock(&ls->ls_recover_lock);
60
        rc->rc_seq = ls->ls_recover_seq;
61
        spin_unlock(&ls->ls_recover_lock);
62
 
63
        *mh_ret = mh;
64
        *rc_ret = rc;
65
        return 0;
66
}
67
 
68
static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
69
                      struct dlm_rcom *rc)
70
{
71
        dlm_rcom_out(rc);
72
        dlm_lowcomms_commit_buffer(mh);
73
}
74
 
75
/* When replying to a status request, a node also sends back its
76
   configuration values.  The requesting node then checks that the remote
77
   node is configured the same way as itself. */
78
 
79
static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
80
{
81
        rf->rf_lvblen = ls->ls_lvblen;
82
        rf->rf_lsflags = ls->ls_exflags;
83
}
84
 
85
static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
86
{
87
        struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
88
 
89
        if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
90
                log_error(ls, "version mismatch: %x nodeid %d: %x",
91
                          DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
92
                          rc->rc_header.h_version);
93
                return -EPROTO;
94
        }
95
 
96
        if (rf->rf_lvblen != ls->ls_lvblen ||
97
            rf->rf_lsflags != ls->ls_exflags) {
98
                log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
99
                          ls->ls_lvblen, ls->ls_exflags,
100
                          nodeid, rf->rf_lvblen, rf->rf_lsflags);
101
                return -EPROTO;
102
        }
103
        return 0;
104
}
105
 
106
static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
107
{
108
        spin_lock(&ls->ls_rcom_spin);
109
        *new_seq = ++ls->ls_rcom_seq;
110
        set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
111
        spin_unlock(&ls->ls_rcom_spin);
112
}
113
 
114
static void disallow_sync_reply(struct dlm_ls *ls)
115
{
116
        spin_lock(&ls->ls_rcom_spin);
117
        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
118
        clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
119
        spin_unlock(&ls->ls_rcom_spin);
120
}
121
 
122
int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
123
{
124
        struct dlm_rcom *rc;
125
        struct dlm_mhandle *mh;
126
        int error = 0;
127
 
128
        ls->ls_recover_nodeid = nodeid;
129
 
130
        if (nodeid == dlm_our_nodeid()) {
131
                rc = (struct dlm_rcom *) ls->ls_recover_buf;
132
                rc->rc_result = dlm_recover_status(ls);
133
                goto out;
134
        }
135
 
136
        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
137
        if (error)
138
                goto out;
139
 
140
        allow_sync_reply(ls, &rc->rc_id);
141
        memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
142
 
143
        send_rcom(ls, mh, rc);
144
 
145
        error = dlm_wait_function(ls, &rcom_response);
146
        disallow_sync_reply(ls);
147
        if (error)
148
                goto out;
149
 
150
        rc = (struct dlm_rcom *) ls->ls_recover_buf;
151
 
152
        if (rc->rc_result == -ESRCH) {
153
                /* we pretend the remote lockspace exists with 0 status */
154
                log_debug(ls, "remote node %d not ready", nodeid);
155
                rc->rc_result = 0;
156
        } else
157
                error = check_config(ls, rc, nodeid);
158
        /* the caller looks at rc_result for the remote recovery status */
159
 out:
160
        return error;
161
}
162
 
163
static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
164
{
165
        struct dlm_rcom *rc;
166
        struct dlm_mhandle *mh;
167
        int error, nodeid = rc_in->rc_header.h_nodeid;
168
 
169
        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
170
                            sizeof(struct rcom_config), &rc, &mh);
171
        if (error)
172
                return;
173
        rc->rc_id = rc_in->rc_id;
174
        rc->rc_seq_reply = rc_in->rc_seq;
175
        rc->rc_result = dlm_recover_status(ls);
176
        make_config(ls, (struct rcom_config *) rc->rc_buf);
177
 
178
        send_rcom(ls, mh, rc);
179
}
180
 
181
static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
182
{
183
        spin_lock(&ls->ls_rcom_spin);
184
        if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
185
            rc_in->rc_id != ls->ls_rcom_seq) {
186
                log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
187
                          rc_in->rc_type, rc_in->rc_header.h_nodeid,
188
                          (unsigned long long)rc_in->rc_id,
189
                          (unsigned long long)ls->ls_rcom_seq);
190
                goto out;
191
        }
192
        memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
193
        set_bit(LSFL_RCOM_READY, &ls->ls_flags);
194
        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
195
        wake_up(&ls->ls_wait_general);
196
 out:
197
        spin_unlock(&ls->ls_rcom_spin);
198
}
199
 
200
static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
201
{
202
        receive_sync_reply(ls, rc_in);
203
}
204
 
205
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
206
{
207
        struct dlm_rcom *rc;
208
        struct dlm_mhandle *mh;
209
        int error = 0, len = sizeof(struct dlm_rcom);
210
 
211
        ls->ls_recover_nodeid = nodeid;
212
 
213
        if (nodeid == dlm_our_nodeid()) {
214
                dlm_copy_master_names(ls, last_name, last_len,
215
                                      ls->ls_recover_buf + len,
216
                                      dlm_config.ci_buffer_size - len, nodeid);
217
                goto out;
218
        }
219
 
220
        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
221
        if (error)
222
                goto out;
223
        memcpy(rc->rc_buf, last_name, last_len);
224
 
225
        allow_sync_reply(ls, &rc->rc_id);
226
        memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
227
 
228
        send_rcom(ls, mh, rc);
229
 
230
        error = dlm_wait_function(ls, &rcom_response);
231
        disallow_sync_reply(ls);
232
 out:
233
        return error;
234
}
235
 
236
static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
237
{
238
        struct dlm_rcom *rc;
239
        struct dlm_mhandle *mh;
240
        int error, inlen, outlen, nodeid;
241
 
242
        nodeid = rc_in->rc_header.h_nodeid;
243
        inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
244
        outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
245
 
246
        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
247
        if (error)
248
                return;
249
        rc->rc_id = rc_in->rc_id;
250
        rc->rc_seq_reply = rc_in->rc_seq;
251
 
252
        dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
253
                              nodeid);
254
        send_rcom(ls, mh, rc);
255
}
256
 
257
static void receive_rcom_names_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
258
{
259
        receive_sync_reply(ls, rc_in);
260
}
261
 
262
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
263
{
264
        struct dlm_rcom *rc;
265
        struct dlm_mhandle *mh;
266
        struct dlm_ls *ls = r->res_ls;
267
        int error;
268
 
269
        error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
270
                            &rc, &mh);
271
        if (error)
272
                goto out;
273
        memcpy(rc->rc_buf, r->res_name, r->res_length);
274
        rc->rc_id = (unsigned long) r;
275
 
276
        send_rcom(ls, mh, rc);
277
 out:
278
        return error;
279
}
280
 
281
static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
282
{
283
        struct dlm_rcom *rc;
284
        struct dlm_mhandle *mh;
285
        int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
286
        int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
287
 
288
        error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
289
        if (error)
290
                return;
291
 
292
        error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
293
        if (error)
294
                ret_nodeid = error;
295
        rc->rc_result = ret_nodeid;
296
        rc->rc_id = rc_in->rc_id;
297
        rc->rc_seq_reply = rc_in->rc_seq;
298
 
299
        send_rcom(ls, mh, rc);
300
}
301
 
302
static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
303
{
304
        dlm_recover_master_reply(ls, rc_in);
305
}
306
 
307
static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
308
                           struct rcom_lock *rl)
309
{
310
        memset(rl, 0, sizeof(*rl));
311
 
312
        rl->rl_ownpid = lkb->lkb_ownpid;
313
        rl->rl_lkid = lkb->lkb_id;
314
        rl->rl_exflags = lkb->lkb_exflags;
315
        rl->rl_flags = lkb->lkb_flags;
316
        rl->rl_lvbseq = lkb->lkb_lvbseq;
317
        rl->rl_rqmode = lkb->lkb_rqmode;
318
        rl->rl_grmode = lkb->lkb_grmode;
319
        rl->rl_status = lkb->lkb_status;
320
        rl->rl_wait_type = lkb->lkb_wait_type;
321
 
322
        if (lkb->lkb_bastaddr)
323
                rl->rl_asts |= AST_BAST;
324
        if (lkb->lkb_astaddr)
325
                rl->rl_asts |= AST_COMP;
326
 
327
        rl->rl_namelen = r->res_length;
328
        memcpy(rl->rl_name, r->res_name, r->res_length);
329
 
330
        /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
331
           If so, receive_rcom_lock_args() won't take this copy. */
332
 
333
        if (lkb->lkb_lvbptr)
334
                memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
335
}
336
 
337
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
338
{
339
        struct dlm_ls *ls = r->res_ls;
340
        struct dlm_rcom *rc;
341
        struct dlm_mhandle *mh;
342
        struct rcom_lock *rl;
343
        int error, len = sizeof(struct rcom_lock);
344
 
345
        if (lkb->lkb_lvbptr)
346
                len += ls->ls_lvblen;
347
 
348
        error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
349
        if (error)
350
                goto out;
351
 
352
        rl = (struct rcom_lock *) rc->rc_buf;
353
        pack_rcom_lock(r, lkb, rl);
354
        rc->rc_id = (unsigned long) r;
355
 
356
        send_rcom(ls, mh, rc);
357
 out:
358
        return error;
359
}
360
 
361
static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
362
{
363
        struct dlm_rcom *rc;
364
        struct dlm_mhandle *mh;
365
        int error, nodeid = rc_in->rc_header.h_nodeid;
366
 
367
        dlm_recover_master_copy(ls, rc_in);
368
 
369
        error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
370
                            sizeof(struct rcom_lock), &rc, &mh);
371
        if (error)
372
                return;
373
 
374
        /* We send back the same rcom_lock struct we received, but
375
           dlm_recover_master_copy() has filled in rl_remid and rl_result */
376
 
377
        memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
378
        rc->rc_id = rc_in->rc_id;
379
        rc->rc_seq_reply = rc_in->rc_seq;
380
 
381
        send_rcom(ls, mh, rc);
382
}
383
 
384
static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
385
{
386
        dlm_recover_process_copy(ls, rc_in);
387
}
388
 
389
/* If the lockspace doesn't exist then still send a status message
390
   back; it's possible that it just doesn't have its global_id yet. */
391
 
392
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
393
{
394
        struct dlm_rcom *rc;
395
        struct rcom_config *rf;
396
        struct dlm_mhandle *mh;
397
        char *mb;
398
        int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
399
 
400
        mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
401
        if (!mh)
402
                return -ENOBUFS;
403
        memset(mb, 0, mb_len);
404
 
405
        rc = (struct dlm_rcom *) mb;
406
 
407
        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
408
        rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
409
        rc->rc_header.h_nodeid = dlm_our_nodeid();
410
        rc->rc_header.h_length = mb_len;
411
        rc->rc_header.h_cmd = DLM_RCOM;
412
 
413
        rc->rc_type = DLM_RCOM_STATUS_REPLY;
414
        rc->rc_id = rc_in->rc_id;
415
        rc->rc_seq_reply = rc_in->rc_seq;
416
        rc->rc_result = -ESRCH;
417
 
418
        rf = (struct rcom_config *) rc->rc_buf;
419
        rf->rf_lvblen = -1;
420
 
421
        dlm_rcom_out(rc);
422
        dlm_lowcomms_commit_buffer(mh);
423
 
424
        return 0;
425
}
426
 
427
static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
428
{
429
        uint64_t seq;
430
        int rv = 0;
431
 
432
        switch (rc->rc_type) {
433
        case DLM_RCOM_STATUS_REPLY:
434
        case DLM_RCOM_NAMES_REPLY:
435
        case DLM_RCOM_LOOKUP_REPLY:
436
        case DLM_RCOM_LOCK_REPLY:
437
                spin_lock(&ls->ls_recover_lock);
438
                seq = ls->ls_recover_seq;
439
                spin_unlock(&ls->ls_recover_lock);
440
                if (rc->rc_seq_reply != seq) {
441
                        log_debug(ls, "ignoring old reply %x from %d "
442
                                      "seq_reply %llx expect %llx",
443
                                      rc->rc_type, rc->rc_header.h_nodeid,
444
                                      (unsigned long long)rc->rc_seq_reply,
445
                                      (unsigned long long)seq);
446
                        rv = 1;
447
                }
448
        }
449
        return rv;
450
}
451
 
452
/* Called by dlm_recv; corresponds to dlm_receive_message() but special
453
   recovery-only comms are sent through here. */
454
 
455
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
456
{
457
        if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
458
                log_debug(ls, "ignoring recovery message %x from %d",
459
                          rc->rc_type, nodeid);
460
                goto out;
461
        }
462
 
463
        if (is_old_reply(ls, rc))
464
                goto out;
465
 
466
        switch (rc->rc_type) {
467
        case DLM_RCOM_STATUS:
468
                receive_rcom_status(ls, rc);
469
                break;
470
 
471
        case DLM_RCOM_NAMES:
472
                receive_rcom_names(ls, rc);
473
                break;
474
 
475
        case DLM_RCOM_LOOKUP:
476
                receive_rcom_lookup(ls, rc);
477
                break;
478
 
479
        case DLM_RCOM_LOCK:
480
                receive_rcom_lock(ls, rc);
481
                break;
482
 
483
        case DLM_RCOM_STATUS_REPLY:
484
                receive_rcom_status_reply(ls, rc);
485
                break;
486
 
487
        case DLM_RCOM_NAMES_REPLY:
488
                receive_rcom_names_reply(ls, rc);
489
                break;
490
 
491
        case DLM_RCOM_LOOKUP_REPLY:
492
                receive_rcom_lookup_reply(ls, rc);
493
                break;
494
 
495
        case DLM_RCOM_LOCK_REPLY:
496
                receive_rcom_lock_reply(ls, rc);
497
                break;
498
 
499
        default:
500
                DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
501
        }
502
 out:
503
        return;
504
}
505
 

powered by: WebSVN 2.1.0

© copyright 1999-2025 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.