OpenCores
URL https://opencores.org/ocsvn/openrisc/openrisc/trunk

Subversion Repositories openrisc

[/] [openrisc/] [trunk/] [gnu-dev/] [or1k-gcc/] [libgo/] [go/] [old/] [netchan/] [export.go] - Blame information for rev 747

Details | Compare with Previous | View Log

Line No. Rev Author Line
1 747 jeremybenn
// Copyright 2010 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4
 
5
/*
6
        Package netchan implements type-safe networked channels:
7
        it allows the two ends of a channel to appear on different
8
        computers connected by a network.  It does this by transporting
9
        data sent to a channel on one machine so it can be recovered
10
        by a receive of a channel of the same type on the other.
11
 
12
        An exporter publishes a set of channels by name.  An importer
13
        connects to the exporting machine and imports the channels
14
        by name. After importing the channels, the two machines can
15
        use the channels in the usual way.
16
 
17
        Networked channels are not synchronized; they always behave
18
        as if they are buffered channels of at least one element.
19
*/
20
package netchan
21
 
22
// BUG: can't use range clause to receive when using ImportNValues to limit the count.
23
 
24
import (
25
        "errors"
26
        "io"
27
        "log"
28
        "net"
29
        "reflect"
30
        "strconv"
31
        "sync"
32
        "time"
33
)
34
 
35
// Export
36
 
37
// expLog is a logging convenience function.  The first argument must be a string.
38
func expLog(args ...interface{}) {
39
        args[0] = "netchan export: " + args[0].(string)
40
        log.Print(args...)
41
}
42
 
43
// An Exporter allows a set of channels to be published on a single
44
// network port.  A single machine may have multiple Exporters
45
// but they must use different ports.
46
type Exporter struct {
47
        *clientSet
48
}
49
 
50
type expClient struct {
51
        *encDec
52
        exp     *Exporter
53
        chans   map[int]*netChan // channels in use by client
54
        mu      sync.Mutex       // protects remaining fields
55
        errored bool             // client has been sent an error
56
        seqNum  int64            // sequences messages sent to client; has value of highest sent
57
        ackNum  int64            // highest sequence number acknowledged
58
        seqLock sync.Mutex       // guarantees messages are in sequence, only locked under mu
59
}
60
 
61
func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
62
        client := new(expClient)
63
        client.exp = exp
64
        client.encDec = newEncDec(conn)
65
        client.seqNum = 0
66
        client.ackNum = 0
67
        client.chans = make(map[int]*netChan)
68
        return client
69
}
70
 
71
func (client *expClient) sendError(hdr *header, err string) {
72
        error := &error_{err}
73
        expLog("sending error to client:", error.Error)
74
        client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
75
        client.mu.Lock()
76
        client.errored = true
77
        client.mu.Unlock()
78
}
79
 
80
func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
81
        exp := client.exp
82
        exp.mu.Lock()
83
        ech, ok := exp.names[name]
84
        exp.mu.Unlock()
85
        if !ok {
86
                client.sendError(hdr, "no such channel: "+name)
87
                return nil
88
        }
89
        if ech.dir != dir {
90
                client.sendError(hdr, "wrong direction for channel: "+name)
91
                return nil
92
        }
93
        nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
94
        client.chans[hdr.Id] = nch
95
        return nch
96
}
97
 
98
func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
99
        nch := client.chans[hdr.Id]
100
        if nch == nil {
101
                return nil
102
        }
103
        if nch.dir != dir {
104
                client.sendError(hdr, "wrong direction for channel: "+nch.name)
105
        }
106
        return nch
107
}
108
 
109
// The function run manages sends and receives for a single client.  For each
110
// (client Recv) request, this will launch a serveRecv goroutine to deliver
111
// the data for that channel, while (client Send) requests are handled as
112
// data arrives from the client.
113
func (client *expClient) run() {
114
        hdr := new(header)
115
        hdrValue := reflect.ValueOf(hdr)
116
        req := new(request)
117
        reqValue := reflect.ValueOf(req)
118
        error := new(error_)
119
        for {
120
                *hdr = header{}
121
                if err := client.decode(hdrValue); err != nil {
122
                        if err != io.EOF {
123
                                expLog("error decoding client header:", err)
124
                        }
125
                        break
126
                }
127
                switch hdr.PayloadType {
128
                case payRequest:
129
                        *req = request{}
130
                        if err := client.decode(reqValue); err != nil {
131
                                expLog("error decoding client request:", err)
132
                                break
133
                        }
134
                        if req.Size < 1 {
135
                                panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
136
                        }
137
                        switch req.Dir {
138
                        case Recv:
139
                                // look up channel before calling serveRecv to
140
                                // avoid a lock around client.chans.
141
                                if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
142
                                        go client.serveRecv(nch, *hdr, req.Count)
143
                                }
144
                        case Send:
145
                                client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
146
                                // The actual sends will have payload type payData.
147
                                // TODO: manage the count?
148
                        default:
149
                                error.Error = "request: can't handle channel direction"
150
                                expLog(error.Error, req.Dir)
151
                                client.encode(hdr, payError, error)
152
                        }
153
                case payData:
154
                        client.serveSend(*hdr)
155
                case payClosed:
156
                        client.serveClosed(*hdr)
157
                case payAck:
158
                        client.mu.Lock()
159
                        if client.ackNum != hdr.SeqNum-1 {
160
                                // Since the sequence number is incremented and the message is sent
161
                                // in a single instance of locking client.mu, the messages are guaranteed
162
                                // to be sent in order.  Therefore receipt of acknowledgement N means
163
                                // all messages <=N have been seen by the recipient.  We check anyway.
164
                                expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
165
                        }
166
                        if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count.
167
                                client.ackNum = hdr.SeqNum
168
                        }
169
                        client.mu.Unlock()
170
                case payAckSend:
171
                        if nch := client.getChan(hdr, Send); nch != nil {
172
                                nch.acked()
173
                        }
174
                default:
175
                        log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
176
                }
177
        }
178
        client.exp.delClient(client)
179
}
180
 
181
// Send all the data on a single channel to a client asking for a Recv.
182
// The header is passed by value to avoid issues of overwriting.
183
func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
184
        for {
185
                val, ok := nch.recv()
186
                if !ok {
187
                        if err := client.encode(&hdr, payClosed, nil); err != nil {
188
                                expLog("error encoding server closed message:", err)
189
                        }
190
                        break
191
                }
192
                // We hold the lock during transmission to guarantee messages are
193
                // sent in sequence number order.  Also, we increment first so the
194
                // value of client.SeqNum is the value of the highest used sequence
195
                // number, not one beyond.
196
                client.mu.Lock()
197
                client.seqNum++
198
                hdr.SeqNum = client.seqNum
199
                client.seqLock.Lock() // guarantee ordering of messages
200
                client.mu.Unlock()
201
                err := client.encode(&hdr, payData, val.Interface())
202
                client.seqLock.Unlock()
203
                if err != nil {
204
                        expLog("error encoding client response:", err)
205
                        client.sendError(&hdr, err.Error())
206
                        break
207
                }
208
                // Negative count means run forever.
209
                if count >= 0 {
210
                        if count--; count <= 0 {
211
                                break
212
                        }
213
                }
214
        }
215
}
216
 
217
// Receive and deliver locally one item from a client asking for a Send
218
// The header is passed by value to avoid issues of overwriting.
219
func (client *expClient) serveSend(hdr header) {
220
        nch := client.getChan(&hdr, Recv)
221
        if nch == nil {
222
                return
223
        }
224
        // Create a new value for each received item.
225
        val := reflect.New(nch.ch.Type().Elem()).Elem()
226
        if err := client.decode(val); err != nil {
227
                expLog("value decode:", err, "; type ", nch.ch.Type())
228
                return
229
        }
230
        nch.send(val)
231
}
232
 
233
// Report that client has closed the channel that is sending to us.
234
// The header is passed by value to avoid issues of overwriting.
235
func (client *expClient) serveClosed(hdr header) {
236
        nch := client.getChan(&hdr, Recv)
237
        if nch == nil {
238
                return
239
        }
240
        nch.close()
241
}
242
 
243
func (client *expClient) unackedCount() int64 {
244
        client.mu.Lock()
245
        n := client.seqNum - client.ackNum
246
        client.mu.Unlock()
247
        return n
248
}
249
 
250
func (client *expClient) seq() int64 {
251
        client.mu.Lock()
252
        n := client.seqNum
253
        client.mu.Unlock()
254
        return n
255
}
256
 
257
func (client *expClient) ack() int64 {
258
        client.mu.Lock()
259
        n := client.seqNum
260
        client.mu.Unlock()
261
        return n
262
}
263
 
264
// Serve waits for incoming connections on the listener
265
// and serves the Exporter's channels on each.
266
// It blocks until the listener is closed.
267
func (exp *Exporter) Serve(listener net.Listener) {
268
        for {
269
                conn, err := listener.Accept()
270
                if err != nil {
271
                        expLog("listen:", err)
272
                        break
273
                }
274
                go exp.ServeConn(conn)
275
        }
276
}
277
 
278
// ServeConn exports the Exporter's channels on conn.
279
// It blocks until the connection is terminated.
280
func (exp *Exporter) ServeConn(conn io.ReadWriter) {
281
        exp.addClient(conn).run()
282
}
283
 
284
// NewExporter creates a new Exporter that exports a set of channels.
285
func NewExporter() *Exporter {
286
        e := &Exporter{
287
                clientSet: &clientSet{
288
                        names:   make(map[string]*chanDir),
289
                        clients: make(map[unackedCounter]bool),
290
                },
291
        }
292
        return e
293
}
294
 
295
// ListenAndServe exports the exporter's channels through the
296
// given network and local address defined as in net.Listen.
297
func (exp *Exporter) ListenAndServe(network, localaddr string) error {
298
        listener, err := net.Listen(network, localaddr)
299
        if err != nil {
300
                return err
301
        }
302
        go exp.Serve(listener)
303
        return nil
304
}
305
 
306
// addClient creates a new expClient and records its existence
307
func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
308
        client := newClient(exp, conn)
309
        exp.mu.Lock()
310
        exp.clients[client] = true
311
        exp.mu.Unlock()
312
        return client
313
}
314
 
315
// delClient forgets the client existed
316
func (exp *Exporter) delClient(client *expClient) {
317
        exp.mu.Lock()
318
        delete(exp.clients, client)
319
        exp.mu.Unlock()
320
}
321
 
322
// Drain waits until all messages sent from this exporter/importer, including
323
// those not yet sent to any client and possibly including those sent while
324
// Drain was executing, have been received by the importer.  In short, it
325
// waits until all the exporter's messages have been received by a client.
326
// If the timeout is positive and Drain takes longer than that to complete,
327
// an error is returned.
328
func (exp *Exporter) Drain(timeout time.Duration) error {
329
        // This wrapper function is here so the method's comment will appear in godoc.
330
        return exp.clientSet.drain(timeout)
331
}
332
 
333
// Sync waits until all clients of the exporter have received the messages
334
// that were sent at the time Sync was invoked.  Unlike Drain, it does not
335
// wait for messages sent while it is running or messages that have not been
336
// dispatched to any client.  If the timeout is positive and Sync takes longer
337
// than that to complete, an error is returned.
338
func (exp *Exporter) Sync(timeout time.Duration) error {
339
        // This wrapper function is here so the method's comment will appear in godoc.
340
        return exp.clientSet.sync(timeout)
341
}
342
 
343
func checkChan(chT interface{}, dir Dir) (reflect.Value, error) {
344
        chanType := reflect.TypeOf(chT)
345
        if chanType.Kind() != reflect.Chan {
346
                return reflect.Value{}, errors.New("not a channel")
347
        }
348
        if dir != Send && dir != Recv {
349
                return reflect.Value{}, errors.New("unknown channel direction")
350
        }
351
        switch chanType.ChanDir() {
352
        case reflect.BothDir:
353
        case reflect.SendDir:
354
                if dir != Recv {
355
                        return reflect.Value{}, errors.New("to import/export with Send, must provide <-chan")
356
                }
357
        case reflect.RecvDir:
358
                if dir != Send {
359
                        return reflect.Value{}, errors.New("to import/export with Recv, must provide chan<-")
360
                }
361
        }
362
        return reflect.ValueOf(chT), nil
363
}
364
 
365
// Export exports a channel of a given type and specified direction.  The
366
// channel to be exported is provided in the call and may be of arbitrary
367
// channel type.
368
// Despite the literal signature, the effective signature is
369
//      Export(name string, chT chan T, dir Dir)
370
func (exp *Exporter) Export(name string, chT interface{}, dir Dir) error {
371
        ch, err := checkChan(chT, dir)
372
        if err != nil {
373
                return err
374
        }
375
        exp.mu.Lock()
376
        defer exp.mu.Unlock()
377
        _, present := exp.names[name]
378
        if present {
379
                return errors.New("channel name already being exported:" + name)
380
        }
381
        exp.names[name] = &chanDir{ch, dir}
382
        return nil
383
}
384
 
385
// Hangup disassociates the named channel from the Exporter and closes
386
// the channel.  Messages in flight for the channel may be dropped.
387
func (exp *Exporter) Hangup(name string) error {
388
        exp.mu.Lock()
389
        chDir, ok := exp.names[name]
390
        if ok {
391
                delete(exp.names, name)
392
        }
393
        // TODO drop all instances of channel from client sets
394
        exp.mu.Unlock()
395
        if !ok {
396
                return errors.New("netchan export: hangup: no such channel: " + name)
397
        }
398
        chDir.ch.Close()
399
        return nil
400
}

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.