URL
https://opencores.org/ocsvn/openrisc/openrisc/trunk
Subversion Repositories openrisc
[/] [openrisc/] [trunk/] [gnu-dev/] [or1k-gcc/] [libgo/] [go/] [net/] [rpc/] [client.go] - Rev 749
Go to most recent revision | Compare with Previous | Blame | View Log
// Copyright 2009 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package rpcimport ("bufio""encoding/gob""errors""io""log""net""net/http""sync")// ServerError represents an error that has been returned from// the remote side of the RPC connection.type ServerError stringfunc (e ServerError) Error() string {return string(e)}var ErrShutdown = errors.New("connection is shut down")// Call represents an active RPC.type Call struct {ServiceMethod string // The name of the service and method to call.Args interface{} // The argument to the function (*struct).Reply interface{} // The reply from the function (*struct).Error error // After completion, the error status.Done chan *Call // Strobes when call is complete.}// Client represents an RPC Client.// There may be multiple outstanding Calls associated// with a single Client.type Client struct {mutex sync.Mutex // protects pending, seq, requestsending sync.Mutexrequest Requestseq uint64codec ClientCodecpending map[uint64]*Callclosing boolshutdown bool}// A ClientCodec implements writing of RPC requests and// reading of RPC responses for the client side of an RPC session.// The client calls WriteRequest to write a request to the connection// and calls ReadResponseHeader and ReadResponseBody in pairs// to read responses. The client calls Close when finished with the// connection. ReadResponseBody may be called with a nil// argument to force the body of the response to be read and then// discarded.type ClientCodec interface {WriteRequest(*Request, interface{}) errorReadResponseHeader(*Response) errorReadResponseBody(interface{}) errorClose() error}func (client *Client) send(call *Call) {client.sending.Lock()defer client.sending.Unlock()// Register this call.client.mutex.Lock()if client.shutdown {call.Error = ErrShutdownclient.mutex.Unlock()call.done()return}seq := client.seqclient.seq++client.pending[seq] = callclient.mutex.Unlock()// Encode and send the request.client.request.Seq = seqclient.request.ServiceMethod = call.ServiceMethoderr := client.codec.WriteRequest(&client.request, call.Args)if err != nil {client.mutex.Lock()delete(client.pending, seq)client.mutex.Unlock()call.Error = errcall.done()}}func (client *Client) input() {var err errorvar response Responsefor err == nil {response = Response{}err = client.codec.ReadResponseHeader(&response)if err != nil {if err == io.EOF && !client.closing {err = io.ErrUnexpectedEOF}break}seq := response.Seqclient.mutex.Lock()call := client.pending[seq]delete(client.pending, seq)client.mutex.Unlock()if response.Error == "" {err = client.codec.ReadResponseBody(call.Reply)if err != nil {call.Error = errors.New("reading body " + err.Error())}} else {// We've got an error response. Give this to the request;// any subsequent requests will get the ReadResponseBody// error if there is one.call.Error = ServerError(response.Error)err = client.codec.ReadResponseBody(nil)if err != nil {err = errors.New("reading error body: " + err.Error())}}call.done()}// Terminate pending calls.client.sending.Lock()client.mutex.Lock()client.shutdown = trueclosing := client.closingfor _, call := range client.pending {call.Error = errcall.done()}client.mutex.Unlock()client.sending.Unlock()if err != io.EOF || !closing {log.Println("rpc: client protocol error:", err)}}func (call *Call) done() {select {case call.Done <- call:// okdefault:// We don't want to block here. It is the caller's responsibility to make// sure the channel has enough buffer space. See comment in Go().log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")}}// NewClient returns a new Client to handle requests to the// set of services at the other end of the connection.// It adds a buffer to the write side of the connection so// the header and payload are sent as a unit.func NewClient(conn io.ReadWriteCloser) *Client {encBuf := bufio.NewWriter(conn)client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}return NewClientWithCodec(client)}// NewClientWithCodec is like NewClient but uses the specified// codec to encode requests and decode responses.func NewClientWithCodec(codec ClientCodec) *Client {client := &Client{codec: codec,pending: make(map[uint64]*Call),}go client.input()return client}type gobClientCodec struct {rwc io.ReadWriteCloserdec *gob.Decoderenc *gob.EncoderencBuf *bufio.Writer}func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {if err = c.enc.Encode(r); err != nil {return}if err = c.enc.Encode(body); err != nil {return}return c.encBuf.Flush()}func (c *gobClientCodec) ReadResponseHeader(r *Response) error {return c.dec.Decode(r)}func (c *gobClientCodec) ReadResponseBody(body interface{}) error {return c.dec.Decode(body)}func (c *gobClientCodec) Close() error {return c.rwc.Close()}// DialHTTP connects to an HTTP RPC server at the specified network address// listening on the default HTTP RPC path.func DialHTTP(network, address string) (*Client, error) {return DialHTTPPath(network, address, DefaultRPCPath)}// DialHTTPPath connects to an HTTP RPC server// at the specified network address and path.func DialHTTPPath(network, address, path string) (*Client, error) {var err errorconn, err := net.Dial(network, address)if err != nil {return nil, err}io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")// Require successful HTTP response// before switching to RPC protocol.resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})if err == nil && resp.Status == connected {return NewClient(conn), nil}if err == nil {err = errors.New("unexpected HTTP response: " + resp.Status)}conn.Close()return nil, &net.OpError{Op: "dial-http",Net: network + " " + address,Addr: nil,Err: err,}}// Dial connects to an RPC server at the specified network address.func Dial(network, address string) (*Client, error) {conn, err := net.Dial(network, address)if err != nil {return nil, err}return NewClient(conn), nil}func (client *Client) Close() error {client.mutex.Lock()if client.shutdown || client.closing {client.mutex.Unlock()return ErrShutdown}client.closing = trueclient.mutex.Unlock()return client.codec.Close()}// Go invokes the function asynchronously. It returns the Call structure representing// the invocation. The done channel will signal when the call is complete by returning// the same Call object. If done is nil, Go will allocate a new channel.// If non-nil, done must be buffered or Go will deliberately crash.func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {call := new(Call)call.ServiceMethod = serviceMethodcall.Args = argscall.Reply = replyif done == nil {done = make(chan *Call, 10) // buffered.} else {// If caller passes done != nil, it must arrange that// done has enough buffer for the number of simultaneous// RPCs that will be using that channel. If the channel// is totally unbuffered, it's best not to run at all.if cap(done) == 0 {log.Panic("rpc: done channel is unbuffered")}}call.Done = doneclient.send(call)return call}// Call invokes the named function, waits for it to complete, and returns its error status.func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error}
Go to most recent revision | Compare with Previous | Blame | View Log
