OpenCores
URL https://opencores.org/ocsvn/pc_fpga_com/pc_fpga_com/trunk

Subversion Repositories pc_fpga_com

[/] [pc_fpga_com/] [trunk/] [PC_FPGA_PLATFPORM/] [SOFTWARE/] [background_reader.cpp] - Blame information for rev 2

Details | Compare with Previous | View Log

Line No. Rev Author Line
1 2 NikosAl
/*
2
 * Copyright (C) 2011 Simon A. Berger
3
 *
4
 *  This program is free software; you may redistribute it and/or modify its
5
 *  under the terms of the GNU Lesser General Public License as published by the Free
6
 *  Software Foundation; either version 2 of the License, or (at your option)
7
 *  any later version.
8
 *
9
 *  This program is distributed in the hope that it will be useful, but
10
 *  WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11
 *  or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12
 *  for more details.
13
 */
14
 
15
#include <cstdio>
16
#include <csignal>
17
#include <sys/socket.h>
18
#include <boost/thread/locks.hpp>
19
#include <endian.h>
20
 
21
#include "background_reader.h"
22
#include "fpga_com.h"
23
 
24
static void handler( int signal ) {
25
    //printf( "signal: %d\n", signal );
26
}
27
 
28
background_reader::background_reader(int s, size_t size)
29
        : m_stop(false),
30
        m_socket(s),
31
        m_max_size(size),
32
        m_pq_mem(0),
33
        m_pq_max_mem(0),
34
        m_pq_max_depth(0),
35
        N_THREADS(1),
36
        m_run_barrier(N_THREADS + 1),
37
        m_threads_joined(false)
38
        //m_native_handle(0)
39
{
40
 
41
}
42
 
43
background_reader::~background_reader()
44
{
45
    if( !m_threads_joined ) {
46
        printf( "WARNING: background_reader::~background_reader(): threads not joined!\n" );
47
    }
48
}
49
 
50
 
51
void background_reader::start() {
52
    /// m_thread.reset( new boost::thread( boost::bind(&background_reader::run, this)));
53
 
54
 
55
 
56
    printf( "starting %zd background reader threads\n", N_THREADS );
57
    //boost::barrier run_barrier( N_THREADS + 1 );
58
 
59
    while( m_thread_group.size() < N_THREADS ) {
60
        m_thread_group.create_thread(boost::bind(&background_reader::run, this ));
61
    }
62
 
63
    m_run_barrier.wait();
64
}
65
 
66
void background_reader::run()
67
{
68
    // BIG-UGLY-KLUDGE-WARNING!
69
    // we have to install a signal handler, in order to disable the auto-restart behaviour of the recv call...
70
    // This is necessary to make the recv call interuptable (= return -1 on interrupt). Otherwise the
71
    // bg threads would either wait forever, or we would need at least two syscalls per recv (any solution that involves select).
72
    //
73
    // the 'handler' does nothing, as we are only interested in interrupting the recv...
74
 
75
    struct sigaction sa;
76
    sa.sa_handler = handler;
77
    sigemptyset(&sa.sa_mask);
78
    sa.sa_flags = 0;
79
 
80
    sigaction( SIGUSR1, &sa, 0 );
81
 
82
    //m_native_handle = pthread_self();
83
 
84
    {
85
        lock_guard_t lock( m_nh_mtx );
86
        m_native_handles.push_back( pthread_self() );
87
 
88
    }
89
 
90
    const size_t MTU = 64 * 1024;
91
 
92
    std::vector<char>bufs(MTU);
93
    char *buf = bufs.data();
94
 
95
    //run_barrier->wait();
96
    //run_barrier = 0;
97
    m_run_barrier.wait();
98
 
99
    while ( !m_stop ) {
100
        ssize_t s = recv( m_socket, buf, MTU, 0 );
101
 
102
//         if( s < 1024 && s != -1 ) {
103
//             printf( "recved term packet %p\n", (void*)pthread_self() );
104
//         }
105
        //printf( "recv: %zd\n", s );
106
        if ( s == -1 && errno == EINTR ) {
107
             printf( "interrupted\n" );
108
 
109
        } else {
110
            lock_guard_t lock( m_pq_mtx );
111
            m_pq.push_back(std::string(buf, buf+s));
112
            m_pq_mem += s;
113
            m_pq_max_mem = std::max( m_pq_mem, m_pq_max_mem );
114
 
115
 
116
            m_pq_max_depth = std::max( m_pq.size(), m_pq_max_depth );
117
        }
118
//         printf( "notify\n" );
119
        m_can_read_condition.notify_one();
120
    }
121
 
122
    printf( "background reader thread exit\n" );
123
}
124
 
125
ssize_t background_reader::block_recv(void* buf, size_t size)
126
{
127
    boost::unique_lock<mutex_t> lock( m_pq_mtx );
128
 
129
    while ( ! m_pq.size() > 0 && !m_stop ) {
130
//         printf( "cond_wait\n" );
131
        m_can_read_condition.wait(lock);
132
    }
133
 
134
    if( m_pq.size() == 0 ) {
135
        return -1;
136
    }
137
 
138
    std::string &pbuf = m_pq.front();
139
 
140
    m_pq_mem -= pbuf.size();
141
 
142
 
143
    ssize_t s = std::min( size, pbuf.size() );
144
    std::copy( pbuf.begin(), pbuf.end(), (char *)buf );
145
    m_pq.pop_front();
146
 
147
    return s;
148
}
149
 
150
 
151
ssize_t background_reader::poll()
152
{
153
    lock_guard_t lock( m_pq_mtx );
154
    if( m_pq.size() > 0 ) {
155
        return m_pq.front().size();
156
    } else {
157
        return -1;
158
    }
159
}
160
 
161
 
162
void background_reader::interrupt() {
163
    //boost::thread::native_handle_type h = m_thread->native_handle();
164
   // pthread_kill( m_native_handle, SIGUSR1 );
165
 
166
    lock_guard_t lock( m_nh_mtx );
167
    for( std::vector<pthread_t>::iterator it = m_native_handles.begin(); it != m_native_handles.end(); ++it ) {
168
        pthread_kill( *it, SIGUSR1 );
169
    }
170
 
171
}
172
 
173
void background_reader::join() {
174
   // m_thread->join();
175
   m_thread_group.join_all();
176
 
177
   printf( "pq max size (bytes): %zd\n", m_pq_max_mem );
178
   printf( "pq max depth (#packets): %zd\n", m_pq_max_depth );
179
 
180
   m_threads_joined = true;
181
}
182
 
183
ssize_t background_reader::purge()
184
{
185
    lock_guard_t lock( m_pq_mtx );
186
 
187
    ssize_t s = m_pq.size();
188
    m_pq.clear();
189
 
190
    return s;
191
}
192
 
193
 
194
 
195
// int mainx() {
196
//     fpga_con_t fc;
197
// 
198
//     fpga_con_init( &fc, "131.159.28.113", 12340, 12350 );
199
// 
200
// 
201
// 
202
//     background_reader bgr( fc.s, 1024 * 1024 * 10 );
203
// 
204
//     bgr.start();
205
// 
206
//     char buf[1024];
207
//     memset( buf, 0, 1024 );
208
//     
209
//     
210
//     const size_t rbuf_size = 10 * 1024;
211
//     char rbuf[rbuf_size];
212
// 
213
//     printf( "sleep\n" );
214
//   //  usleep( 2000000 );
215
//     printf( "close\n" );
216
//     //close( fc.s );
217
// 
218
//     //bgr.stop();
219
//     //bgr.interrupt();
220
//     //getchar();
221
// 
222
//    // bgr.join();
223
// 
224
//     printf( "joined\n" );
225
//     size_t n = 0;
226
//     for ( int i = 0; i < 10; i++ ) {
227
//         fpga_con_send( &fc, buf, 1024 );
228
//         printf( "sent\n" );
229
// 
230
// 
231
//         
232
//         while (true) {
233
//             size_t s = bgr.block_recv( rbuf, rbuf_size );
234
//             n++;
235
// //             printf( "recv: %zd\n", s );
236
//             if ( s < 1024 ) {
237
//                 break;
238
//             }
239
//         }
240
// 
241
//         printf( "recved: %zd\n", n );
242
//     }
243
//     
244
//     bgr.stop();
245
//     bgr.interrupt();
246
//     bgr.join();
247
//     printf( "bg reader joined. exit.\n" );
248
// }
249
 
250
 
251
 
252
static inline background_reader &get_bgr( fpga_bgr_t *cbgr ) {
253
    assert( cbgr != 0 );
254
    assert( cbgr->bgr_cpp != 0 );
255
    return *(static_cast<background_reader*>(cbgr->bgr_cpp));
256
}
257
 
258
void fpga_bgr_init( fpga_bgr_t *cbgr, int socket, size_t size ) {
259
    memset( cbgr, 0, sizeof( fpga_bgr_t ));
260
 
261
    background_reader *bgr = new background_reader(socket, size);
262
    cbgr->bgr_cpp = bgr;
263
}
264
 
265
void fpga_bgr_delete( fpga_bgr_t *cbgr ) {
266
 
267
    background_reader *bgr = static_cast<background_reader*>(cbgr->bgr_cpp);
268
    delete bgr;
269
 
270
}
271
 
272
void fpga_bgr_start( fpga_bgr_t *cbgr) {
273
    background_reader &bgr = get_bgr(cbgr);
274
    bgr.start();
275
}
276
ssize_t fpga_bgr_block_recv( fpga_bgr_t *cbgr, void *buf, size_t size ) {
277
    background_reader &bgr = get_bgr(cbgr);
278
 
279
    return bgr.block_recv(buf, size);
280
}
281
ssize_t fpga_bgr_poll( fpga_bgr_t *cbgr ) {
282
    background_reader &bgr = get_bgr(cbgr);
283
 
284
    return bgr.poll();
285
 
286
}
287
void fpga_bgr_stop_interrupt_join( fpga_bgr_t *cbgr) {
288
    background_reader &bgr = get_bgr(cbgr);
289
 
290
    bgr.stop();
291
    bgr.interrupt();
292
    bgr.join();
293
}
294
 
295
 
296
// template <typename T>
297
// inline static T swap_endian( T &v ) {
298
//     
299
//     T vo = v;
300
//     uint8_t *vb = (uint8_t*) &vo;
301
//     
302
//     for( size_t i = 0; i < sizeof(T) / 2; i++ ) {
303
//         std::swap(vb[i], vb[sizeof(T) - i - 1]);
304
//         
305
//     }
306
//     return vo;
307
// }
308
 
309
 
310
template<typename T,size_t N>
311
struct swap_endian {
312
    inline T operator()( T &v ) {
313
 
314
        T vo = v;
315
        uint8_t *vb = (uint8_t*) &vo;
316
 
317
        for( size_t i = 0; i < sizeof(T) / 2; i++ ) {
318
            std::swap(vb[i], vb[sizeof(T) - i - 1]);
319
        }
320
        return vo;
321
    }
322
};
323
 
324
 
325
template<typename T>
326
struct swap_endian<T,2> {
327
    inline T operator()( T &v ) {
328
        uint16_t t = __bswap_16(*((uint16_t*)&v));
329
 
330
        T* r = (T*)&t; // we don't want to dereference a type-punned pointer, do we?
331
        return *r;
332
 
333
    }
334
};
335
 
336
 
337
template<typename T>
338
struct swap_endian<T,4> {
339
    inline T operator()( T &v ) {
340
        uint32_t t = __bswap_32(*((uint32_t*)&v));
341
 
342
        T* r = (T*)&t;
343
        return *r;
344
    }
345
};
346
 
347
template<typename T>
348
struct swap_endian<T,8> {
349
    inline T operator()( T &v ) {
350
        uint64_t t = __bswap_64(*((uint64_t*)&v));
351
 
352
        T* r = (T*)&t;
353
        return *r;
354
    }
355
};
356
 
357
 
358
 
359
 
360
 
361
template<typename T>
362
struct swappy_au {
363
    const static size_t N = sizeof(T);
364
 
365
 
366
    swap_endian<T,N> swap;
367
    inline void operator()( void * dest, void * src, size_t n ) {
368
        std::copy( (char *)src, ((char *)src) + n * N, (char *)dest );
369
        //std::memcpy( dest, src, n * N );
370
 
371
        T * ptr = (T *)dest;
372
        T * end = ptr + n;
373
 
374
        // do the endian swapping in place in the aligned buffer
375
        while( ptr < end ) {
376
            *ptr = swap( *ptr );
377
            ptr++;
378
        }
379
 
380
    }
381
 
382
};
383
 
384
template<typename T>
385
struct swappy_aa {
386
    const static size_t N = sizeof(T);
387
    swap_endian<T,N> swap;
388
    inline void operator()( void * dest, void * src, size_t n ) {
389
 
390
 
391
        T * sptr = (T *)src;
392
        T * ptr = (T *)dest;
393
        T * end = ptr + n;
394
 
395
        // copy and swap on the fly, assuming that both buffers are aligned
396
        while( ptr < end ) {
397
            *ptr = swap( *sptr );
398
            ptr++;
399
            sptr++;
400
        }
401
 
402
    }
403
 
404
};
405
 
406
 
407
template<typename T>
408
struct xerox_plain {
409
    const static size_t N = sizeof(T);
410
    inline void operator()( void * dest, void * src, size_t n ) {
411
        std::copy( (char*)src, ((char*)src) + n * N, (char*)dest ); // using char* here to prevent std;:copy form making any assumptions about th alignment of src/dest
412
        //std::copy( (T*)src, ((T*)src) + n, (T*)dest );
413
    }
414
};
415
 
416
 
417
// template <typename T>
418
// inline T passthrough( T &v ) {
419
//  
420
//     return v;
421
// }
422
// 
423
 
424
template <typename T,class CopyF>
425
static bool fpga_bgr_recv_genv( fpga_bgr_t *cbgr, T *buf, size_t n, char ht ) {
426
    background_reader &bgr = get_bgr(cbgr);
427
 
428
 
429
 
430
    T *buf_end = buf + n;
431
    T *ptr = buf;
432
 
433
    const size_t MPU = 9000;
434
    uint8_t rbuf[MPU];
435
 
436
    CopyF xerox;
437
 
438
    while( ptr < buf_end ) {
439
 
440
 
441
        ssize_t raw_size = bgr.block_recv(rbuf, MPU);
442
//         printf( "ptr: %p %zd %d %d %d %d\n", ptr, size, rbuf[0], rbuf[1], rbuf[2], rbuf[3] );
443
        assert( raw_size > 1 );
444
 
445
        if( rbuf[0] != ht ) {
446
            printf( "drop wrong packet type: %d %d\n", rbuf[0], ht );
447
        }
448
 
449
        ssize_t size = raw_size - 1;
450
        uint8_t *rptr = rbuf + 1;
451
 
452
        ssize_t ne = size / sizeof(T);
453
        ssize_t left = buf_end - buf;
454
 
455
        ssize_t to_copy = std::min( ne, left );
456
        xerox( ptr, rptr, to_copy );
457
 
458
 
459
        ptr += to_copy;
460
    }
461
 
462
    return true;
463
}
464
 
465
 
466
int fpga_bgr_recv_charv( fpga_bgr_t *bgr, char *buf, size_t n ) {
467
    return fpga_bgr_recv_genv<char,xerox_plain<char> >( bgr, buf, n, FPC_CODE_CHAR );
468
}
469
 
470
int fpga_bgr_recv_shortv( fpga_bgr_t *bgr, int16_t *buf, size_t n ) {
471
    return fpga_bgr_recv_genv<int16_t,swappy_au<int16_t> >( bgr, buf, n, FPC_CODE_SHORT );
472
}
473
 
474
int fpga_bgr_recv_intv( fpga_bgr_t *bgr, int32_t *buf, size_t n ) {
475
    return fpga_bgr_recv_genv<int32_t,swappy_au<int32_t> >( bgr, buf, n, FPC_CODE_INT );
476
}
477
 
478
int fpga_bgr_recv_longv( fpga_bgr_t *bgr, int64_t *buf, size_t n ) {
479
    return fpga_bgr_recv_genv<int64_t,swappy_au<int64_t> >( bgr, buf, n, FPC_CODE_LONG );
480
}
481
 
482
int fpga_bgr_recv_floatv( fpga_bgr_t *bgr, float *buf, size_t n ) {
483
    return fpga_bgr_recv_genv<float,swappy_au<float> >( bgr, buf, n, FPC_CODE_FLOAT );
484
}
485
 
486
int fpga_bgr_recv_doublev( fpga_bgr_t *bgr, double *buf, size_t n ) {
487
    //return fpga_bgr_recv_genv<double,xerox_plain<double,8> >( bgr, buf, n );
488
    return fpga_bgr_recv_genv<double,swappy_au<double> >( bgr, buf, n, FPC_CODE_DOUBLE );
489
}
490
 
491
#if 0
492
 
493
 
494
int main() {
495
    fpga_con_t fc;
496
 
497
    fpga_con_init( &fc, "131.159.28.113", 12340, 12350 );
498
 
499
    fpga_bgr_t bgr;
500
 
501
    fpga_bgr_init( &bgr, fc.s, 1024 * 1024 * 10 );
502
 
503
    fpga_bgr_start( &bgr );
504
 
505
    char buf[1024];
506
    memset( buf, 0, 1024 );
507
    int ibuf[1000];
508
    std::fill( ibuf, ibuf + 1000, 666 );
509
 
510
 
511
    const size_t rbuf_size = 10 * 1024;
512
    char rbuf[rbuf_size];
513
 
514
//     printf( "sleep\n" );
515
  //  usleep( 2000000 );
516
//     printf( "close\n" );
517
    //close( fc.s );
518
 
519
    //bgr.stop();
520
    //bgr.interrupt();
521
    //getchar();
522
 
523
   // bgr.join();
524
 
525
//     printf( "joined\n" );
526
    size_t n = 0;
527
    for ( int i = 0; i < 1; i++ ) {
528
        //fpga_con_send( &fc, buf, 1024 );
529
        fpga_con_send_intv( &fc, ibuf, 1000 );
530
        printf( "sent\n" );
531
 
532
        double iv[1000];
533
        bool succ = fpga_bgr_recv_doublev( &bgr, iv, 1000 );
534
 
535
        for( int i = 0; i < 1000; i++ ) {
536
           printf( "%.2f ", iv[i] );
537
            if( i % 20 == 19 ) {
538
                printf( "\n" );
539
            }
540
        }
541
        printf( "\n" );
542
 
543
        printf( "recved: %d\n", succ );
544
    }
545
 
546
    fpga_bgr_stop_interrupt_join( &bgr );
547
    printf( "bg reader joined. exit.\n" );
548
 
549
    fpga_bgr_delete( &bgr );
550
}
551
#endif

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.