[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: LDM 5.0 vs 5.0.6 AFOS


  • Subject: Re: LDM 5.0 vs 5.0.6 AFOS
  • Date: Tue, 2 Mar 1999 12:30:00 -0700 (MST)

David,


The pqing.c shipped with 5.0.6 had an AFOS bug in it. I'm including the
fixed pqing.c code, this should work for you.

Robb...



On Tue, 2 Mar 1999, David Magee wrote:

> 
> I've got a Sun Sparc 2 with a NOAA WeatherWire feed into ttya.
> 
> Under ldm 5.0, the system works flawlessly.
> 
> Under ldm 5.0.6, no data is seen coming in by LDM.  
> 
> In ldmd.conf:
> exec    "afos -v -b 4800 -p none /dev/ttya"
> 
> I had similar problems trying to upgrad from 5.0 to 5.0.2
> 
> Any suggestions?  Am I stuck at 5.0 ?  I believe the problem lies within
> pqing.
> 
> 
> 
> Thanks-
> 
> David
> 
> 
> 
> 
> --
> 
> David K. Magee
> address@hidden
> 
> 
> 

===============================================================================
Robb Kambic                                Unidata Program Center
Software Engineer III                      Univ. Corp for Atmospheric Research
address@hidden             WWW: http://www.unidata.ucar.edu/
===============================================================================
/*
 *   Copyright 1993, University Corporation for Atmospheric Research
 *   See ../COPYRIGHT file for copying and redistribution conditions.
 */
/* $Id: pqing.c,v 1.66 1999/02/09 03:17:09 davis Exp $ */
static char version[] =
"$Revision: 1.66 $ built "__DATE__" "__TIME__;

#include <ldmconfig.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <limits.h> /* PATH_MAX */
#ifndef PATH_MAX
#define PATH_MAX 255
#endif /* !PATH_MAX */
#include <sys/types.h>
#include <sys/time.h>
#include <rpc/rpc.h>
#include <errno.h>

#include "ulog.h"
#include "inetutil.h"
#include "ldm.h"
#include "feed.h"
#include "atofeedt.h"
#include "ldmprint.h"
#include "wmo_message.h"
#include "afos_message.h"
#include "faa604_message.h"
#include "paths.h"
#include "pq.h"
#include "md5.h"

#ifdef NO_ATEXIT
#include "atexit.h"
#endif

#if NET
#define RETRY_DELAY     (10)            /* delay factor between retries */
#define MAX_RETRIES     (30)
#endif /* NET */

/* set by command line */
char *baud = NULL;
char *rawfname = NULL;
char *parity = NULL;
enum { CHK_UNSET, CHK_CHECK, CHK_DONT} chkflag = CHK_UNSET;
static feedtypet feedtype = NONE; /* deduce from av[0]  */
static const char *progname = NULL;
static char *logfname = "";
static char feedfname[PATH_MAX];
static char myname[HOSTNAMESIZE];
static const char *pqfname = DEFAULT_QUEUE;

static pqueue *pq = NULL;
static int ifd = -1; 

static volatile int done = 0;
static volatile int intr = 0;
static volatile int stats_req = 0;

static void (*prod_stats)(void) = wmo_stats;
static unsigned long ndups = 0;

static MD5_CTX *md5ctxp = NULL;

#if NET
static int port_error = 0;      /* indicate sigpipe condition was raised */
#       ifndef DEFAULT_RESET_SECS
#       define DEFAULT_RESET_SECS 600
#       endif
static int reset_secs = DEFAULT_RESET_SECS;
#endif

/*
 * called at exit
 */
static void
cleanup(void)
{
        unotice("Exiting"); 
        if(!intr)
        {
                /* We are not in the interrupt context */

                if(md5ctxp != NULL)
                {
                        free_MD5_CTX(md5ctxp);  
                }

                if(pq != NULL)
                {
                        off_t highwater = 0;
                        size_t maxregions = 0;
                        (void) pq_highwater(pq, &highwater, &maxregions);
                        (void) pq_close(pq);
                        pq = NULL;

                        if(feed_close)
                                (*feed_close)(ifd);
                        ifd = -1;
                        unotice("  Queue usage (bytes):%8ld",
                                                (long)highwater);
                        unotice("           (nregions):%8ld",
                                                (long)maxregions);
                        unotice("  Duplicates rejected:%8lu", ndups);
                }
                (*prod_stats)();
                (*feed_stats)();
        }
        (void) closeulog();
}


/*
 * called upon receipt of signals
 */
static void
signal_handler(int sig)
{
#ifdef SVR3SIGNALS
        /* 
         * Some systems reset handler to SIG_DFL upon entry to handler.
         * In that case, we reregister our handler.
         */
        (void) signal(sig, signal_handler);
#endif
        switch(sig) {
        case SIGINT :
                unotice("Interrupt");
                intr = !0;
                exit(0);
        case SIGTERM :
                udebug("SIGTERM");
                done = !0;
                return;
        case SIGPIPE :
#if NET
                if(INPUT_IS_SOCKET)
                {
                        unotice("SIGPIPE");
                        port_error = !0;
                }
#endif
                return;
        case SIGUSR1 :
                udebug("SIGUSR1");
                stats_req = !0;
                return;
        case SIGUSR2 :
                udebug("SIGUSR2");
                if (toggleulogpri(LOG_INFO))
                        unotice("Going verbose");
                else
                        unotice("Going silent");
                return;
        }
        udebug("signal_handler: unhandled signal: %d", sig);
}


/*
 * register the signal_handler
 */
static void
set_sigactions(void)
{
        struct sigaction sigact;

        sigemptyset(&sigact.sa_mask);
        sigact.sa_flags = 0;

        /* Ignore these */
        sigact.sa_handler = SIG_IGN;
        (void) sigaction(SIGHUP, &sigact, NULL);
        (void) sigaction(SIGALRM, &sigact, NULL);
        (void) sigaction(SIGCHLD, &sigact, NULL);

        /* Handle these */
#ifdef SA_RESTART       /* SVR4, 4.3+ BSD */
        /* usually, restart system calls */
        sigact.sa_flags |= SA_RESTART;
#endif
        sigact.sa_handler = signal_handler;
        (void) sigaction(SIGTERM, &sigact, NULL);
        (void) sigaction(SIGUSR1, &sigact, NULL);
        (void) sigaction(SIGUSR2, &sigact, NULL);

        /* Don't restart after interrupt */
        sigact.sa_flags = 0;
#ifdef SA_INTERRUPT     /* SunOS 4.x */
        sigact.sa_flags |= SA_INTERRUPT;
#endif
        (void) sigaction(SIGINT, &sigact, NULL);
        (void) sigaction(SIGPIPE, &sigact, NULL);
}


static void
usage(
        char *av0 /*  id string */
)
{
        (void)fprintf(stderr,
                "Usage: %s [options] feedname\t\nOptions:\n", av0);
        (void)fprintf(stderr,
                "\t-v           Verbose, tell me about each product\n");
        (void)fprintf(stderr,
                "\t-r rawfile   Stash 'raw' data in \"rawfile\"\n");
        (void)fprintf(stderr,
                "\t-l logfile   Default logs to syslogd\n");
        (void)fprintf(stderr,
                "\t-f type      Claim to be feedtype \"type\", one of \"hds\", 
\"ddplus\", ...\n");
        (void)fprintf(stderr,
                "\t-b baud      Set baudrate for tty feed.\n");
        (void)fprintf(stderr,
                "\t-q queue     default \"%s\"\n", DEFAULT_QUEUE);
        (void)fprintf(stderr,
                "\t-p [even|odd|none]  Set parity for tty feed.\n");
        (void)fprintf(stderr,
                "\t-c           Enable checksum or parity check on non tty 
feed\n");
        (void)fprintf(stderr,
                "\t-n           Disable checksum or parity check on tty 
feed\n");
#if NET
        (void)fprintf(stderr,
                "\t-P port      Get input via TCP connection to host 
\"feedname\" at \"port\"\n");
        (void)fprintf(stderr,
                "\t-T timeout   Idle timeout before TCP reconnect, in 
seconds\n");
        (void)fprintf(stderr,
                "\t             (defaults to %d, 0 disables timeout)\n",
                        DEFAULT_RESET_SECS);
#endif
        exit(1);
}


void
toClients(timestampt arrival,
        unsigned seqno,
        const char *ident,
        unsigned len,
        const char *buf)
{
        static struct product prod;
        int status;

        prod.info.arrival = arrival;
        MD5Init(md5ctxp);
        MD5Update(md5ctxp, (const unsigned char *)buf, len);
        MD5Final(prod.info.signature, md5ctxp);
        prod.info.origin = myname;
        prod.info.feedtype = feedtype;
        prod.info.seqno = seqno;
        prod.info.ident = (char *)ident; /* cast away const */
        prod.info.sz = len;
        prod.data = (void *)buf; /* cast away const */

        if(ulogIsVerbose())
                uinfo("%s", s_prod_info(NULL, 0, &prod.info, ulogIsDebug()));

        if(pq == NULL)          /* if we are "feedtest", do nothing else */
                return;

        status = pq_insert(pq, &prod);
        if(status == ENOERR)
                return; /* Normal return */

        /* else */
        if(status == PQUEUE_DUP)
        {
                ndups++;
                uinfo("Product already in queue");
                return;
        }

        /* else, error */
        uerror("pq_insert: %s\n",
                status > 0 ? strerror(status) : "Internal error");
        exit(1); /* ??? */
}


static void
setFeedDefaults(feedtypet type)
{
        /* set up defaults for feed according to type */
        switch (type) {
        case DDPLUS :
                baud = "19200";
                parity = "even";
                break;
        case PPS :
        case DDS :
        case IDS :
                baud = "9600";
                parity = "even";
                break;
        case HDS :
                baud = "19200";
                parity = "none";
                break;
        case AFOS :
                baud = "4800"; /* ??? */
                parity = "none";
                break;
        case FAA604 :
                baud = "1200";
                parity = "even";
                break;
        }
}


static feedtypet 
whatami(const char *av0)
{
        feedtypet type;
#define SEP     '/' /* separates components of path */
        /* strip off leading path */
        if ((progname = strrchr(av0, SEP)) == NULL)
                progname = av0;
        else
            progname++;
        
        type = atofeedtypet(progname);
        if(type == NONE)
                type = WMO; /* default for wmo ingestd */
        setFeedDefaults(type);
        return type;    
}


int
main(int ac, char *av[])
{

        int logfd;
        int width;
        int ready;
        unsigned long idle;
        fd_set readfds;
        fd_set exceptfds;
        struct timeval timeo;

        feedtype = whatami(av[0]);

        /*
         * Check the environment for some options.
         * May be overridden by command line switches below.
         */
        {
                const char *ldmpqfname = getenv("LDMPQFNAME");
                if(ldmpqfname != NULL)
                        pqfname = ldmpqfname;
        }

        {
        extern int optind;
        extern int opterr;
        extern char *optarg;
        int ch;
        int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE));

        opterr = 1;

        while ((ch = getopt(ac, av, "vxcnl:b:p:P:T:q:r:f:")) != EOF)
                switch (ch) {
                case 'v':
                        logmask |= LOG_MASK(LOG_INFO);
                        break;
                case 'x':
                        logmask |= LOG_MASK(LOG_DEBUG);
                        break;
                case 'c':
                        chkflag = CHK_CHECK;
                        break;
                case 'n':
                        chkflag = CHK_DONT;
                        break;
                case 'l':
                        if(optarg[0] == '-' && optarg[1] != 0)
                        {
                                fprintf(stderr, "logfile \"%s\" ??\n",
                                        optarg);
                                usage(av[0]);
                        }
                        /* else */
                        logfname = optarg;
                        break;
                case 'b':
                        baud = optarg;
                        break;
                case 'p':
                        parity = optarg;
                        break;
#if NET
                case 'P':
                        *((int *)&server_port) = atoi(optarg); /* cast away 
const */
                        if(server_port <= 0 || server_port > 65536)
                        {
                                fprintf(stderr, "invalid server_port %s\n",
                                        optarg);
                                usage(av[0]);
                        }
                        break;
                case 'T':
                        reset_secs = atoi(optarg);
                        if(reset_secs < 0)
                        {
                                fprintf(stderr, "invalid timeout %s\n",
                                        optarg);
                                usage(av[0]);
                        }
                        break;
#endif /* NET */
                case 'q':
                        pqfname = optarg;
                        break;
                case 'r':
                        rawfname = optarg;
                        break;
                case 'f':
                        {
                                feedtypet type;
                                type = atofeedtypet(optarg);
                                if(type != NONE)
                                {
                                        feedtype = type;
                                        if(!parity && !baud)
                                                setFeedDefaults(type);
                                }
                        }
                        break;
                case '?':
                        usage(av[0]);
                        break;
                }

        /* last arg, feedfname, is required */
        if(ac - optind <= 0)
                usage(av[0]);
        strncat(feedfname,av[optind], sizeof(feedfname) - 6 );

        (void) setulogmask(logmask);
        }


        /*
         * initialize logger
         */
        if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
                (void) fclose(stderr);
        logfd = openulog(ubasename(av[0]),
                (LOG_CONS|LOG_PID), LOG_LDM, logfname);
        unotice("Starting Up");
        udebug(version);

        if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
        {
                setbuf(fdopen(logfd, "a"), NULL);
        }       

        /*
         * register exit handler
         */
        if(atexit(cleanup) != 0)
        {
                serror("atexit");
                return 1;
        }

        /*
         * set up signal handlers
         */
        set_sigactions();

        /*
         * open the product queue, unless we were invoked as "feedtest"
         */
        if(strcmp(progname, "feedtest") != 0)
        {
                if(ready = pq_open(pqfname, PQ_DEFAULT, &pq))
                {
                        uerror("pq_open: \"%s\" failed: %s",
                                pqfname, strerror(ready));
                        return 1;
                }
        }

        /*
         * who am i, anyway
         */
        (void) strcpy(myname, ghostname());

        /*
         * open the feed
         */
        if(!(*feedfname == '-' && feedfname[1] == 0) && logfd != 0)
                (void) close(0);

        if(open_feed(feedfname, &ifd) != ENOERR)
                return 1;

        if (feedtype & HDS)
        {
                if(chkflag == CHK_CHECK
                                || (isatty(ifd) && chkflag != CHK_DONT))
                        setTheScanner(scan_wmo_binary_crc);
                else
                        setTheScanner(scan_wmo_binary);
        }
        else if (feedtype == AFOS)
        {
                prod_stats = afos_stats;
                setTheScanner(scan_afos);
        }
        else if (feedtype == NMC2) /* CONDUIT */
        {
                setTheScanner(scan_wmo_binary);
        }
        else if (feedtype == FAA604)
        {
                prod_stats = faa604_stats;
                if(chkflag == CHK_CHECK
                        || (isatty(ifd)
                                 && chkflag != CHK_DONT
                                 && parity != NULL
                                 && *parity != 'n')
                        )
                {
                        setTheScanner(scan_faa604_parity);
                }
                else
                {
                        setTheScanner(scan_faa604);
                }
        }
        else
        {
                if(chkflag == CHK_CHECK
                        || (isatty(ifd)
                                 && chkflag != CHK_DONT
                                 && parity != NULL
                                 && *parity != 'n')
                        )
                {
                        setTheScanner(scan_wmo_parity);
                }
                else
                {
                        setTheScanner(scan_wmo);
                }
        }

        /*
         * Allocate an MD5 context
         */
        md5ctxp = new_MD5_CTX();
        if(md5ctxp == NULL)
        {
                serror("new_md5_CTX failed");
                return 1;
        }


        /*
         * Main Loop
         */
        idle = 0;
        while(!done)
        {
#if NET
if (INPUT_IS_SOCKET)
{
                if (port_error)
                {
                        /*
                         * lost connection => close
                         */
                        if (ifd >= 0)
                        {
                                if(feed_close)
                                        (*feed_close)(ifd);
                                ifd = -1;
                        }
                        port_error = 0;
                        sleep (2);      /* allow things to settle down */
                        continue;
                }
}
#endif
                if(stats_req)
                {
                        unotice("Statistics Request"); 
                        if(pq != NULL)
                        {
                                off_t highwater = 0;
                                size_t maxregions = 0;
                                (void) pq_highwater(pq, &highwater,
                                         &maxregions);
                                unotice("  Queue usage (bytes):%8ld",
                                                        (long)highwater);
                                unotice("           (nregions):%8ld",
                                                        (long)maxregions);
                        }
                        unotice("       Idle: %8lu seconds", idle);
#if NET
if (INPUT_IS_SOCKET)
{
                        unotice("    Timeout: %8d", reset_secs);
}
#endif
                        unotice("%21s: %s", "Status",
                                (ifd < 0) ?
                                "Not connected or input not open." :
                                "Connected.");
                        (*prod_stats)();
                        (*feed_stats)();
                        stats_req = 0;
                }
#if NET
if (INPUT_IS_SOCKET)
{
                if (ifd < 0)
                {
                        /* Attempt reconnect */
                        static int retries = 0;
                        if (retries > MAX_RETRIES)
                        {
                                uerror ("maximum retry attempts %d, aborting",
                                        MAX_RETRIES);
                                done = !0;
                                continue;
                        }
                        /* Try to reopen on tcp read errors */
                        unotice("Trying to re-open connection on port %d", 
                                server_port);
                        ++retries;
                        if(open_feed(feedfname, &ifd) != ENOERR)
                        {
                                unotice ("sleeping %d seconds before retry %d",
                                         retries * RETRY_DELAY, retries+1);
                                sleep (retries * RETRY_DELAY);
                                continue;
                        }
                        retries = 0;
                }
}
#endif /* NET */
                timeo.tv_sec = 3;
                timeo.tv_usec = 0;
                FD_ZERO(&readfds);
                FD_ZERO(&exceptfds);
                FD_SET(ifd, &readfds);
                FD_SET(ifd, &exceptfds);
                width =  ifd + 1;
                ready = select(width, &readfds, 0, &exceptfds, &timeo);
                if(ready < 0 )
                {
                        /* handle EINTR as a special case */
                        if(errno == EINTR)
                        {
                                errno = 0;
                                continue;
                        }
                        serror("select");
                        return 1;
                }
                /* else */
#if 0
                if (FD_ISSET(ifd, &exceptfds))
                {
                        uerror("Exception on input fd %d, select returned %d",
                               ifd, ready);
                }
#endif
                if(ready > 0)
                {
                        /* do some work */
                        if(FD_ISSET(ifd, &readfds) || 
                           FD_ISSET(ifd, &exceptfds))
                        {
                                idle = 0;
                                if(feedTheXbuf(ifd) != ENOERR)
                                {
#if NET
if (INPUT_IS_SOCKET)
{
                                        port_error = !0;
                                        continue;
}                                       /* else */
#endif /* NET */
                                        done = !0;
                                }
                                FD_CLR(ifd, &readfds);
                                FD_CLR(ifd, &exceptfds);
                        }
                        else
                        {
                                uerror("select returned %d but ifd not set",
                                        ready);
                                idle += timeo.tv_sec;
                        }
                }
                else    /* ready == 0 */
                {
                        idle += timeo.tv_sec;
#if NET
if (INPUT_IS_SOCKET)
{
                        /* VOODOO
                         * This is necessary to stimulate
                         * 'Connection reset by peer'
                         * when the Portmaster goes down and comes
                         * back up.
                         */
                        static char zed[1] = {0};
                        if(write(ifd, zed, sizeof(zed)) < 0)
                        {
                                port_error = !0;
                                continue;
                        }

}
#endif
                }
#if NET
if (INPUT_IS_SOCKET)
{
                if ((reset_secs > 0) && (idle >= reset_secs))
                {
                        unotice("Idle for %ld seconds, reconnecting",
                                idle);
                        /* force reconnect */
                        port_error = !0;
                        idle = 0;
                        continue;
                }
}
#endif /* NET */
                (void) scanTheXbuf();
        }

        return 0;
}