URL
https://opencores.org/ocsvn/openrisc/openrisc/trunk
Subversion Repositories openrisc
[/] [openrisc/] [trunk/] [gnu-dev/] [or1k-gcc/] [libgo/] [go/] [old/] [netchan/] [import.go] - Rev 747
Compare with Previous | Blame | View Log
// Copyright 2010 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package netchanimport ("errors""io""log""net""reflect""sync""time")// Import// impLog is a logging convenience function. The first argument must be a string.func impLog(args ...interface{}) {args[0] = "netchan import: " + args[0].(string)log.Print(args...)}// An Importer allows a set of channels to be imported from a single// remote machine/network port. A machine may have multiple// importers, even from the same machine/network port.type Importer struct {*encDecchanLock sync.Mutex // protects access to channel mapnames map[string]*netChanchans map[int]*netChanerrors chan errormaxId intmu sync.Mutex // protects remaining fieldsunacked int64 // number of unacknowledged sends.seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu}// NewImporter creates a new Importer object to import a set of channels// from the given connection. The Exporter must be available and serving when// the Importer is created.func NewImporter(conn io.ReadWriter) *Importer {imp := new(Importer)imp.encDec = newEncDec(conn)imp.chans = make(map[int]*netChan)imp.names = make(map[string]*netChan)imp.errors = make(chan error, 10)imp.unacked = 0go imp.run()return imp}// Import imports a set of channels from the given network and address.func Import(network, remoteaddr string) (*Importer, error) {conn, err := net.Dial(network, remoteaddr)if err != nil {return nil, err}return NewImporter(conn), nil}// shutdown closes all channels for which we are receiving data from the remote side.func (imp *Importer) shutdown() {imp.chanLock.Lock()for _, ich := range imp.chans {if ich.dir == Recv {ich.close()}}imp.chanLock.Unlock()}// Handle the data from a single imported data stream, which will// have the form// (response, data)*// The response identifies by name which channel is transmitting data.func (imp *Importer) run() {// Loop on responses; requests are sent by ImportNValues()hdr := new(header)hdrValue := reflect.ValueOf(hdr)ackHdr := new(header)err := new(error_)errValue := reflect.ValueOf(err)for {*hdr = header{}if e := imp.decode(hdrValue); e != nil {if e != io.EOF {impLog("header:", e)imp.shutdown()}return}switch hdr.PayloadType {case payData:// done lower in loopcase payError:if e := imp.decode(errValue); e != nil {impLog("error:", e)return}if err.Error != "" {impLog("response error:", err.Error)select {case imp.errors <- errors.New(err.Error):continue // errors are not acknowledgeddefault:imp.shutdown()return}}case payClosed:nch := imp.getChan(hdr.Id, false)if nch != nil {nch.close()}continue // closes are not acknowledged.case payAckSend:// we can receive spurious acks if the channel is// hung up, so we ask getChan to ignore any errors.nch := imp.getChan(hdr.Id, true)if nch != nil {nch.acked()imp.mu.Lock()imp.unacked--imp.mu.Unlock()}continuedefault:impLog("unexpected payload type:", hdr.PayloadType)return}nch := imp.getChan(hdr.Id, false)if nch == nil {continue}if nch.dir != Recv {impLog("cannot happen: receive from non-Recv channel")return}// Acknowledge receiptackHdr.Id = hdr.IdackHdr.SeqNum = hdr.SeqNumimp.encode(ackHdr, payAck, nil)// Create a new value for each received item.value := reflect.New(nch.ch.Type().Elem()).Elem()if e := imp.decode(value); e != nil {impLog("importer value decode:", e)return}nch.send(value)}}func (imp *Importer) getChan(id int, errOk bool) *netChan {imp.chanLock.Lock()ich := imp.chans[id]imp.chanLock.Unlock()if ich == nil {if !errOk {impLog("unknown id in netchan request: ", id)}return nil}return ich}// Errors returns a channel from which transmission and protocol errors// can be read. Clients of the importer are not required to read the error// channel for correct execution. However, if too many errors occur// without being read from the error channel, the importer will shut down.func (imp *Importer) Errors() chan error {return imp.errors}// Import imports a channel of the given type, size and specified direction.// It is equivalent to ImportNValues with a count of -1, meaning unbounded.func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) error {return imp.ImportNValues(name, chT, dir, size, -1)}// ImportNValues imports a channel of the given type and specified// direction and then receives or transmits up to n values on that// channel. A value of n==-1 implies an unbounded number of values. The// channel will have buffer space for size values, or 1 value if size < 1.// The channel to be bound to the remote site's channel is provided// in the call and may be of arbitrary channel type.// Despite the literal signature, the effective signature is// ImportNValues(name string, chT chan T, dir Dir, size, n int) error// Example usage:// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")// if err != nil { log.Fatal(err) }// ch := make(chan myType)// err = imp.ImportNValues("name", ch, Recv, 1, 1)// if err != nil { log.Fatal(err) }// fmt.Printf("%+v\n", <-ch)func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) error {ch, err := checkChan(chT, dir)if err != nil {return err}imp.chanLock.Lock()defer imp.chanLock.Unlock()_, present := imp.names[name]if present {return errors.New("channel name already being imported:" + name)}if size < 1 {size = 1}id := imp.maxIdimp.maxId++nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))imp.names[name] = nchimp.chans[id] = nch// Tell the other side about this channel.hdr := &header{Id: id}req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}if err = imp.encode(hdr, payRequest, req); err != nil {impLog("request encode:", err)return err}if dir == Send {go func() {for i := 0; n == -1 || i < n; i++ {val, ok := nch.recv()if !ok {if err = imp.encode(hdr, payClosed, nil); err != nil {impLog("error encoding client closed message:", err)}return}// We hold the lock during transmission to guarantee messages are// sent in order.imp.mu.Lock()imp.unacked++imp.seqLock.Lock()imp.mu.Unlock()if err = imp.encode(hdr, payData, val.Interface()); err != nil {impLog("error encoding client send:", err)return}imp.seqLock.Unlock()}}()}return nil}// Hangup disassociates the named channel from the Importer and closes// the channel. Messages in flight for the channel may be dropped.func (imp *Importer) Hangup(name string) error {imp.chanLock.Lock()defer imp.chanLock.Unlock()nc := imp.names[name]if nc == nil {return errors.New("netchan import: hangup: no such channel: " + name)}delete(imp.names, name)delete(imp.chans, nc.id)nc.close()return nil}func (imp *Importer) unackedCount() int64 {imp.mu.Lock()n := imp.unackedimp.mu.Unlock()return n}// Drain waits until all messages sent from this exporter/importer, including// those not yet sent to any server and possibly including those sent while// Drain was executing, have been received by the exporter. In short, it// waits until all the importer's messages have been received.// If the timeout (measured in nanoseconds) is positive and Drain takes// longer than that to complete, an error is returned.func (imp *Importer) Drain(timeout int64) error {deadline := time.Now().Add(time.Duration(timeout))for imp.unackedCount() > 0 {if timeout > 0 && time.Now().After(deadline) {return errors.New("timeout")}time.Sleep(100 * time.Millisecond)}return nil}
