OpenCores
URL https://opencores.org/ocsvn/openrisc/openrisc/trunk

Subversion Repositories openrisc

[/] [openrisc/] [trunk/] [gnu-dev/] [or1k-gcc/] [libgo/] [go/] [old/] [netchan/] [common.go] - Rev 747

Compare with Previous | Blame | View Log

// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package netchan

import (
        "encoding/gob"
        "errors"
        "io"
        "reflect"
        "sync"
        "time"
)

// The direction of a connection from the client's perspective.
type Dir int

const (
        Recv Dir = iota
        Send
)

func (dir Dir) String() string {
        switch dir {
        case Recv:
                return "Recv"
        case Send:
                return "Send"
        }
        return "???"
}

// Payload types
const (
        payRequest = iota // request structure follows
        payError          // error structure follows
        payData           // user payload follows
        payAck            // acknowledgement; no payload
        payClosed         // channel is now closed
        payAckSend        // payload has been delivered.
)

// A header is sent as a prefix to every transmission.  It will be followed by
// a request structure, an error structure, or an arbitrary user payload structure.
type header struct {
        Id          int
        PayloadType int
        SeqNum      int64
}

// Sent with a header once per channel from importer to exporter to report
// that it wants to bind to a channel with the specified direction for count
// messages, with space for size buffered values. If count is -1, it means unlimited.
type request struct {
        Name  string
        Count int64
        Size  int
        Dir   Dir
}

// Sent with a header to report an error.
type error_ struct {
        Error string
}

// Used to unify management of acknowledgements for import and export.
type unackedCounter interface {
        unackedCount() int64
        ack() int64
        seq() int64
}

// A channel and its direction.
type chanDir struct {
        ch  reflect.Value
        dir Dir
}

// clientSet contains the objects and methods needed for tracking
// clients of an exporter and draining outstanding messages.
type clientSet struct {
        mu      sync.Mutex // protects access to channel and client maps
        names   map[string]*chanDir
        clients map[unackedCounter]bool
}

// Mutex-protected encoder and decoder pair.
type encDec struct {
        decLock sync.Mutex
        dec     *gob.Decoder
        encLock sync.Mutex
        enc     *gob.Encoder
}

func newEncDec(conn io.ReadWriter) *encDec {
        return &encDec{
                dec: gob.NewDecoder(conn),
                enc: gob.NewEncoder(conn),
        }
}

// Decode an item from the connection.
func (ed *encDec) decode(value reflect.Value) error {
        ed.decLock.Lock()
        err := ed.dec.DecodeValue(value)
        if err != nil {
                // TODO: tear down connection?
        }
        ed.decLock.Unlock()
        return err
}

// Encode a header and payload onto the connection.
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error {
        ed.encLock.Lock()
        hdr.PayloadType = payloadType
        err := ed.enc.Encode(hdr)
        if err == nil {
                if payload != nil {
                        err = ed.enc.Encode(payload)
                }
        }
        if err != nil {
                // TODO: tear down connection if there is an error?
        }
        ed.encLock.Unlock()
        return err
}

// See the comment for Exporter.Drain.
func (cs *clientSet) drain(timeout time.Duration) error {
        deadline := time.Now().Add(timeout)
        for {
                pending := false
                cs.mu.Lock()
                // Any messages waiting for a client?
                for _, chDir := range cs.names {
                        if chDir.ch.Len() > 0 {
                                pending = true
                        }
                }
                // Any unacknowledged messages?
                for client := range cs.clients {
                        n := client.unackedCount()
                        if n > 0 { // Check for > rather than != just to be safe.
                                pending = true
                                break
                        }
                }
                cs.mu.Unlock()
                if !pending {
                        break
                }
                if timeout > 0 && time.Now().After(deadline) {
                        return errors.New("timeout")
                }
                time.Sleep(100 * time.Millisecond)
        }
        return nil
}

// See the comment for Exporter.Sync.
func (cs *clientSet) sync(timeout time.Duration) error {
        deadline := time.Now().Add(timeout)
        // seq remembers the clients and their seqNum at point of entry.
        seq := make(map[unackedCounter]int64)
        cs.mu.Lock()
        for client := range cs.clients {
                seq[client] = client.seq()
        }
        cs.mu.Unlock()
        for {
                pending := false
                cs.mu.Lock()
                // Any unacknowledged messages?  Look only at clients that existed
                // when we started and are still in this client set.
                for client := range seq {
                        if _, ok := cs.clients[client]; ok {
                                if client.ack() < seq[client] {
                                        pending = true
                                        break
                                }
                        }
                }
                cs.mu.Unlock()
                if !pending {
                        break
                }
                if timeout > 0 && time.Now().After(deadline) {
                        return errors.New("timeout")
                }
                time.Sleep(100 * time.Millisecond)
        }
        return nil
}

// A netChan represents a channel imported or exported
// on a single connection. Flow is controlled by the receiving
// side by sending payAckSend messages when values
// are delivered into the local channel.
type netChan struct {
        *chanDir
        name   string
        id     int
        size   int // buffer size of channel.
        closed bool

        // sender-specific state
        ackCh chan bool // buffered with space for all the acks we need
        space int       // available space.

        // receiver-specific state
        sendCh chan reflect.Value // buffered channel of values received from other end.
        ed     *encDec            // so that we can send acks.
        count  int64              // number of values still to receive.
}

// Create a new netChan with the given name (only used for
// messages), id, direction, buffer size, and count.
// The connection to the other side is represented by ed.
func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
        c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
        if c.dir == Send {
                c.ackCh = make(chan bool, size)
                c.space = size
        }
        return c
}

// Close the channel.
func (nch *netChan) close() {
        if nch.closed {
                return
        }
        if nch.dir == Recv {
                if nch.sendCh != nil {
                        // If the sender goroutine is active, close the channel to it.
                        // It will close nch.ch when it can.
                        close(nch.sendCh)
                } else {
                        nch.ch.Close()
                }
        } else {
                nch.ch.Close()
                close(nch.ackCh)
        }
        nch.closed = true
}

// Send message from remote side to local receiver.
func (nch *netChan) send(val reflect.Value) {
        if nch.dir != Recv {
                panic("send on wrong direction of channel")
        }
        if nch.sendCh == nil {
                // If possible, do local send directly and ack immediately.
                if nch.ch.TrySend(val) {
                        nch.sendAck()
                        return
                }
                // Start sender goroutine to manage delayed delivery of values.
                nch.sendCh = make(chan reflect.Value, nch.size)
                go nch.sender()
        }
        select {
        case nch.sendCh <- val:
                // ok
        default:
                // TODO: should this be more resilient?
                panic("netchan: remote sender sent more values than allowed")
        }
}

// sendAck sends an acknowledgment that a message has left
// the channel's buffer. If the messages remaining to be sent
// will fit in the channel's buffer, then we don't
// need to send an ack.
func (nch *netChan) sendAck() {
        if nch.count < 0 || nch.count > int64(nch.size) {
                nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
        }
        if nch.count > 0 {
                nch.count--
        }
}

// The sender process forwards items from the sending queue
// to the destination channel, acknowledging each item.
func (nch *netChan) sender() {
        if nch.dir != Recv {
                panic("sender on wrong direction of channel")
        }
        // When Exporter.Hangup is called, the underlying channel is closed,
        // and so we may get a "too many operations on closed channel" error
        // if there are outstanding messages in sendCh.
        // Make sure that this doesn't panic the whole program.
        defer func() {
                if r := recover(); r != nil {
                        // TODO check that r is "too many operations", otherwise re-panic.
                }
        }()
        for v := range nch.sendCh {
                nch.ch.Send(v)
                nch.sendAck()
        }
        nch.ch.Close()
}

// Receive value from local side for sending to remote side.
func (nch *netChan) recv() (val reflect.Value, ok bool) {
        if nch.dir != Send {
                panic("recv on wrong direction of channel")
        }

        if nch.space == 0 {
                // Wait for buffer space.
                <-nch.ackCh
                nch.space++
        }
        nch.space--
        return nch.ch.Recv()
}

// acked is called when the remote side indicates that
// a value has been delivered.
func (nch *netChan) acked() {
        if nch.dir != Send {
                panic("recv on wrong direction of channel")
        }
        select {
        case nch.ackCh <- true:
                // ok
        default:
                // TODO: should this be more resilient?
                panic("netchan: remote receiver sent too many acks")
        }
}

Compare with Previous | Blame | View Log

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.