OpenCores
URL https://opencores.org/ocsvn/openrisc/openrisc/trunk

Subversion Repositories openrisc

[/] [openrisc/] [trunk/] [gnu-dev/] [or1k-gcc/] [libgo/] [go/] [old/] [netchan/] [common.go] - Blame information for rev 867

Go to most recent revision | Details | Compare with Previous | View Log

Line No. Rev Author Line
1 747 jeremybenn
// Copyright 2010 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4
 
5
package netchan
6
 
7
import (
8
        "encoding/gob"
9
        "errors"
10
        "io"
11
        "reflect"
12
        "sync"
13
        "time"
14
)
15
 
16
// The direction of a connection from the client's perspective.
17
type Dir int
18
 
19
const (
20
        Recv Dir = iota
21
        Send
22
)
23
 
24
func (dir Dir) String() string {
25
        switch dir {
26
        case Recv:
27
                return "Recv"
28
        case Send:
29
                return "Send"
30
        }
31
        return "???"
32
}
33
 
34
// Payload types
35
const (
36
        payRequest = iota // request structure follows
37
        payError          // error structure follows
38
        payData           // user payload follows
39
        payAck            // acknowledgement; no payload
40
        payClosed         // channel is now closed
41
        payAckSend        // payload has been delivered.
42
)
43
 
44
// A header is sent as a prefix to every transmission.  It will be followed by
45
// a request structure, an error structure, or an arbitrary user payload structure.
46
type header struct {
47
        Id          int
48
        PayloadType int
49
        SeqNum      int64
50
}
51
 
52
// Sent with a header once per channel from importer to exporter to report
53
// that it wants to bind to a channel with the specified direction for count
54
// messages, with space for size buffered values. If count is -1, it means unlimited.
55
type request struct {
56
        Name  string
57
        Count int64
58
        Size  int
59
        Dir   Dir
60
}
61
 
62
// Sent with a header to report an error.
63
type error_ struct {
64
        Error string
65
}
66
 
67
// Used to unify management of acknowledgements for import and export.
68
type unackedCounter interface {
69
        unackedCount() int64
70
        ack() int64
71
        seq() int64
72
}
73
 
74
// A channel and its direction.
75
type chanDir struct {
76
        ch  reflect.Value
77
        dir Dir
78
}
79
 
80
// clientSet contains the objects and methods needed for tracking
81
// clients of an exporter and draining outstanding messages.
82
type clientSet struct {
83
        mu      sync.Mutex // protects access to channel and client maps
84
        names   map[string]*chanDir
85
        clients map[unackedCounter]bool
86
}
87
 
88
// Mutex-protected encoder and decoder pair.
89
type encDec struct {
90
        decLock sync.Mutex
91
        dec     *gob.Decoder
92
        encLock sync.Mutex
93
        enc     *gob.Encoder
94
}
95
 
96
func newEncDec(conn io.ReadWriter) *encDec {
97
        return &encDec{
98
                dec: gob.NewDecoder(conn),
99
                enc: gob.NewEncoder(conn),
100
        }
101
}
102
 
103
// Decode an item from the connection.
104
func (ed *encDec) decode(value reflect.Value) error {
105
        ed.decLock.Lock()
106
        err := ed.dec.DecodeValue(value)
107
        if err != nil {
108
                // TODO: tear down connection?
109
        }
110
        ed.decLock.Unlock()
111
        return err
112
}
113
 
114
// Encode a header and payload onto the connection.
115
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error {
116
        ed.encLock.Lock()
117
        hdr.PayloadType = payloadType
118
        err := ed.enc.Encode(hdr)
119
        if err == nil {
120
                if payload != nil {
121
                        err = ed.enc.Encode(payload)
122
                }
123
        }
124
        if err != nil {
125
                // TODO: tear down connection if there is an error?
126
        }
127
        ed.encLock.Unlock()
128
        return err
129
}
130
 
131
// See the comment for Exporter.Drain.
132
func (cs *clientSet) drain(timeout time.Duration) error {
133
        deadline := time.Now().Add(timeout)
134
        for {
135
                pending := false
136
                cs.mu.Lock()
137
                // Any messages waiting for a client?
138
                for _, chDir := range cs.names {
139
                        if chDir.ch.Len() > 0 {
140
                                pending = true
141
                        }
142
                }
143
                // Any unacknowledged messages?
144
                for client := range cs.clients {
145
                        n := client.unackedCount()
146
                        if n > 0 { // Check for > rather than != just to be safe.
147
                                pending = true
148
                                break
149
                        }
150
                }
151
                cs.mu.Unlock()
152
                if !pending {
153
                        break
154
                }
155
                if timeout > 0 && time.Now().After(deadline) {
156
                        return errors.New("timeout")
157
                }
158
                time.Sleep(100 * time.Millisecond)
159
        }
160
        return nil
161
}
162
 
163
// See the comment for Exporter.Sync.
164
func (cs *clientSet) sync(timeout time.Duration) error {
165
        deadline := time.Now().Add(timeout)
166
        // seq remembers the clients and their seqNum at point of entry.
167
        seq := make(map[unackedCounter]int64)
168
        cs.mu.Lock()
169
        for client := range cs.clients {
170
                seq[client] = client.seq()
171
        }
172
        cs.mu.Unlock()
173
        for {
174
                pending := false
175
                cs.mu.Lock()
176
                // Any unacknowledged messages?  Look only at clients that existed
177
                // when we started and are still in this client set.
178
                for client := range seq {
179
                        if _, ok := cs.clients[client]; ok {
180
                                if client.ack() < seq[client] {
181
                                        pending = true
182
                                        break
183
                                }
184
                        }
185
                }
186
                cs.mu.Unlock()
187
                if !pending {
188
                        break
189
                }
190
                if timeout > 0 && time.Now().After(deadline) {
191
                        return errors.New("timeout")
192
                }
193
                time.Sleep(100 * time.Millisecond)
194
        }
195
        return nil
196
}
197
 
198
// A netChan represents a channel imported or exported
199
// on a single connection. Flow is controlled by the receiving
200
// side by sending payAckSend messages when values
201
// are delivered into the local channel.
202
type netChan struct {
203
        *chanDir
204
        name   string
205
        id     int
206
        size   int // buffer size of channel.
207
        closed bool
208
 
209
        // sender-specific state
210
        ackCh chan bool // buffered with space for all the acks we need
211
        space int       // available space.
212
 
213
        // receiver-specific state
214
        sendCh chan reflect.Value // buffered channel of values received from other end.
215
        ed     *encDec            // so that we can send acks.
216
        count  int64              // number of values still to receive.
217
}
218
 
219
// Create a new netChan with the given name (only used for
220
// messages), id, direction, buffer size, and count.
221
// The connection to the other side is represented by ed.
222
func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
223
        c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
224
        if c.dir == Send {
225
                c.ackCh = make(chan bool, size)
226
                c.space = size
227
        }
228
        return c
229
}
230
 
231
// Close the channel.
232
func (nch *netChan) close() {
233
        if nch.closed {
234
                return
235
        }
236
        if nch.dir == Recv {
237
                if nch.sendCh != nil {
238
                        // If the sender goroutine is active, close the channel to it.
239
                        // It will close nch.ch when it can.
240
                        close(nch.sendCh)
241
                } else {
242
                        nch.ch.Close()
243
                }
244
        } else {
245
                nch.ch.Close()
246
                close(nch.ackCh)
247
        }
248
        nch.closed = true
249
}
250
 
251
// Send message from remote side to local receiver.
252
func (nch *netChan) send(val reflect.Value) {
253
        if nch.dir != Recv {
254
                panic("send on wrong direction of channel")
255
        }
256
        if nch.sendCh == nil {
257
                // If possible, do local send directly and ack immediately.
258
                if nch.ch.TrySend(val) {
259
                        nch.sendAck()
260
                        return
261
                }
262
                // Start sender goroutine to manage delayed delivery of values.
263
                nch.sendCh = make(chan reflect.Value, nch.size)
264
                go nch.sender()
265
        }
266
        select {
267
        case nch.sendCh <- val:
268
                // ok
269
        default:
270
                // TODO: should this be more resilient?
271
                panic("netchan: remote sender sent more values than allowed")
272
        }
273
}
274
 
275
// sendAck sends an acknowledgment that a message has left
276
// the channel's buffer. If the messages remaining to be sent
277
// will fit in the channel's buffer, then we don't
278
// need to send an ack.
279
func (nch *netChan) sendAck() {
280
        if nch.count < 0 || nch.count > int64(nch.size) {
281
                nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
282
        }
283
        if nch.count > 0 {
284
                nch.count--
285
        }
286
}
287
 
288
// The sender process forwards items from the sending queue
289
// to the destination channel, acknowledging each item.
290
func (nch *netChan) sender() {
291
        if nch.dir != Recv {
292
                panic("sender on wrong direction of channel")
293
        }
294
        // When Exporter.Hangup is called, the underlying channel is closed,
295
        // and so we may get a "too many operations on closed channel" error
296
        // if there are outstanding messages in sendCh.
297
        // Make sure that this doesn't panic the whole program.
298
        defer func() {
299
                if r := recover(); r != nil {
300
                        // TODO check that r is "too many operations", otherwise re-panic.
301
                }
302
        }()
303
        for v := range nch.sendCh {
304
                nch.ch.Send(v)
305
                nch.sendAck()
306
        }
307
        nch.ch.Close()
308
}
309
 
310
// Receive value from local side for sending to remote side.
311
func (nch *netChan) recv() (val reflect.Value, ok bool) {
312
        if nch.dir != Send {
313
                panic("recv on wrong direction of channel")
314
        }
315
 
316
        if nch.space == 0 {
317
                // Wait for buffer space.
318
                <-nch.ackCh
319
                nch.space++
320
        }
321
        nch.space--
322
        return nch.ch.Recv()
323
}
324
 
325
// acked is called when the remote side indicates that
326
// a value has been delivered.
327
func (nch *netChan) acked() {
328
        if nch.dir != Send {
329
                panic("recv on wrong direction of channel")
330
        }
331
        select {
332
        case nch.ackCh <- true:
333
                // ok
334
        default:
335
                // TODO: should this be more resilient?
336
                panic("netchan: remote receiver sent too many acks")
337
        }
338
}

powered by: WebSVN 2.1.0

© copyright 1999-2025 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.