OpenCores
URL https://opencores.org/ocsvn/openrisc/openrisc/trunk

Subversion Repositories openrisc

[/] [openrisc/] [trunk/] [gnu-dev/] [or1k-gcc/] [libgo/] [go/] [net/] [rpc/] [client.go] - Blame information for rev 791

Go to most recent revision | Details | Compare with Previous | View Log

Line No. Rev Author Line
1 747 jeremybenn
// Copyright 2009 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4
 
5
package rpc
6
 
7
import (
8
        "bufio"
9
        "encoding/gob"
10
        "errors"
11
        "io"
12
        "log"
13
        "net"
14
        "net/http"
15
        "sync"
16
)
17
 
18
// ServerError represents an error that has been returned from
19
// the remote side of the RPC connection.
20
type ServerError string
21
 
22
func (e ServerError) Error() string {
23
        return string(e)
24
}
25
 
26
var ErrShutdown = errors.New("connection is shut down")
27
 
28
// Call represents an active RPC.
29
type Call struct {
30
        ServiceMethod string      // The name of the service and method to call.
31
        Args          interface{} // The argument to the function (*struct).
32
        Reply         interface{} // The reply from the function (*struct).
33
        Error         error       // After completion, the error status.
34
        Done          chan *Call  // Strobes when call is complete.
35
}
36
 
37
// Client represents an RPC Client.
38
// There may be multiple outstanding Calls associated
39
// with a single Client.
40
type Client struct {
41
        mutex    sync.Mutex // protects pending, seq, request
42
        sending  sync.Mutex
43
        request  Request
44
        seq      uint64
45
        codec    ClientCodec
46
        pending  map[uint64]*Call
47
        closing  bool
48
        shutdown bool
49
}
50
 
51
// A ClientCodec implements writing of RPC requests and
52
// reading of RPC responses for the client side of an RPC session.
53
// The client calls WriteRequest to write a request to the connection
54
// and calls ReadResponseHeader and ReadResponseBody in pairs
55
// to read responses.  The client calls Close when finished with the
56
// connection. ReadResponseBody may be called with a nil
57
// argument to force the body of the response to be read and then
58
// discarded.
59
type ClientCodec interface {
60
        WriteRequest(*Request, interface{}) error
61
        ReadResponseHeader(*Response) error
62
        ReadResponseBody(interface{}) error
63
 
64
        Close() error
65
}
66
 
67
func (client *Client) send(call *Call) {
68
        client.sending.Lock()
69
        defer client.sending.Unlock()
70
 
71
        // Register this call.
72
        client.mutex.Lock()
73
        if client.shutdown {
74
                call.Error = ErrShutdown
75
                client.mutex.Unlock()
76
                call.done()
77
                return
78
        }
79
        seq := client.seq
80
        client.seq++
81
        client.pending[seq] = call
82
        client.mutex.Unlock()
83
 
84
        // Encode and send the request.
85
        client.request.Seq = seq
86
        client.request.ServiceMethod = call.ServiceMethod
87
        err := client.codec.WriteRequest(&client.request, call.Args)
88
        if err != nil {
89
                client.mutex.Lock()
90
                delete(client.pending, seq)
91
                client.mutex.Unlock()
92
                call.Error = err
93
                call.done()
94
        }
95
}
96
 
97
func (client *Client) input() {
98
        var err error
99
        var response Response
100
        for err == nil {
101
                response = Response{}
102
                err = client.codec.ReadResponseHeader(&response)
103
                if err != nil {
104
                        if err == io.EOF && !client.closing {
105
                                err = io.ErrUnexpectedEOF
106
                        }
107
                        break
108
                }
109
                seq := response.Seq
110
                client.mutex.Lock()
111
                call := client.pending[seq]
112
                delete(client.pending, seq)
113
                client.mutex.Unlock()
114
 
115
                if response.Error == "" {
116
                        err = client.codec.ReadResponseBody(call.Reply)
117
                        if err != nil {
118
                                call.Error = errors.New("reading body " + err.Error())
119
                        }
120
                } else {
121
                        // We've got an error response. Give this to the request;
122
                        // any subsequent requests will get the ReadResponseBody
123
                        // error if there is one.
124
                        call.Error = ServerError(response.Error)
125
                        err = client.codec.ReadResponseBody(nil)
126
                        if err != nil {
127
                                err = errors.New("reading error body: " + err.Error())
128
                        }
129
                }
130
                call.done()
131
        }
132
        // Terminate pending calls.
133
        client.sending.Lock()
134
        client.mutex.Lock()
135
        client.shutdown = true
136
        closing := client.closing
137
        for _, call := range client.pending {
138
                call.Error = err
139
                call.done()
140
        }
141
        client.mutex.Unlock()
142
        client.sending.Unlock()
143
        if err != io.EOF || !closing {
144
                log.Println("rpc: client protocol error:", err)
145
        }
146
}
147
 
148
func (call *Call) done() {
149
        select {
150
        case call.Done <- call:
151
                // ok
152
        default:
153
                // We don't want to block here.  It is the caller's responsibility to make
154
                // sure the channel has enough buffer space. See comment in Go().
155
                log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
156
        }
157
}
158
 
159
// NewClient returns a new Client to handle requests to the
160
// set of services at the other end of the connection.
161
// It adds a buffer to the write side of the connection so
162
// the header and payload are sent as a unit.
163
func NewClient(conn io.ReadWriteCloser) *Client {
164
        encBuf := bufio.NewWriter(conn)
165
        client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
166
        return NewClientWithCodec(client)
167
}
168
 
169
// NewClientWithCodec is like NewClient but uses the specified
170
// codec to encode requests and decode responses.
171
func NewClientWithCodec(codec ClientCodec) *Client {
172
        client := &Client{
173
                codec:   codec,
174
                pending: make(map[uint64]*Call),
175
        }
176
        go client.input()
177
        return client
178
}
179
 
180
type gobClientCodec struct {
181
        rwc    io.ReadWriteCloser
182
        dec    *gob.Decoder
183
        enc    *gob.Encoder
184
        encBuf *bufio.Writer
185
}
186
 
187
func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
188
        if err = c.enc.Encode(r); err != nil {
189
                return
190
        }
191
        if err = c.enc.Encode(body); err != nil {
192
                return
193
        }
194
        return c.encBuf.Flush()
195
}
196
 
197
func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
198
        return c.dec.Decode(r)
199
}
200
 
201
func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
202
        return c.dec.Decode(body)
203
}
204
 
205
func (c *gobClientCodec) Close() error {
206
        return c.rwc.Close()
207
}
208
 
209
// DialHTTP connects to an HTTP RPC server at the specified network address
210
// listening on the default HTTP RPC path.
211
func DialHTTP(network, address string) (*Client, error) {
212
        return DialHTTPPath(network, address, DefaultRPCPath)
213
}
214
 
215
// DialHTTPPath connects to an HTTP RPC server
216
// at the specified network address and path.
217
func DialHTTPPath(network, address, path string) (*Client, error) {
218
        var err error
219
        conn, err := net.Dial(network, address)
220
        if err != nil {
221
                return nil, err
222
        }
223
        io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
224
 
225
        // Require successful HTTP response
226
        // before switching to RPC protocol.
227
        resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
228
        if err == nil && resp.Status == connected {
229
                return NewClient(conn), nil
230
        }
231
        if err == nil {
232
                err = errors.New("unexpected HTTP response: " + resp.Status)
233
        }
234
        conn.Close()
235
        return nil, &net.OpError{
236
                Op:   "dial-http",
237
                Net:  network + " " + address,
238
                Addr: nil,
239
                Err:  err,
240
        }
241
}
242
 
243
// Dial connects to an RPC server at the specified network address.
244
func Dial(network, address string) (*Client, error) {
245
        conn, err := net.Dial(network, address)
246
        if err != nil {
247
                return nil, err
248
        }
249
        return NewClient(conn), nil
250
}
251
 
252
func (client *Client) Close() error {
253
        client.mutex.Lock()
254
        if client.shutdown || client.closing {
255
                client.mutex.Unlock()
256
                return ErrShutdown
257
        }
258
        client.closing = true
259
        client.mutex.Unlock()
260
        return client.codec.Close()
261
}
262
 
263
// Go invokes the function asynchronously.  It returns the Call structure representing
264
// the invocation.  The done channel will signal when the call is complete by returning
265
// the same Call object.  If done is nil, Go will allocate a new channel.
266
// If non-nil, done must be buffered or Go will deliberately crash.
267
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
268
        call := new(Call)
269
        call.ServiceMethod = serviceMethod
270
        call.Args = args
271
        call.Reply = reply
272
        if done == nil {
273
                done = make(chan *Call, 10) // buffered.
274
        } else {
275
                // If caller passes done != nil, it must arrange that
276
                // done has enough buffer for the number of simultaneous
277
                // RPCs that will be using that channel.  If the channel
278
                // is totally unbuffered, it's best not to run at all.
279
                if cap(done) == 0 {
280
                        log.Panic("rpc: done channel is unbuffered")
281
                }
282
        }
283
        call.Done = done
284
        client.send(call)
285
        return call
286
}
287
 
288
// Call invokes the named function, waits for it to complete, and returns its error status.
289
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
290
        call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
291
        return call.Error
292
}

powered by: WebSVN 2.1.0

© copyright 1999-2025 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.