[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: LDM 5.0 vs 5.0.6 AFOS
- Subject: Re: LDM 5.0 vs 5.0.6 AFOS
- Date: Tue, 2 Mar 1999 12:30:00 -0700 (MST)
David,
The pqing.c shipped with 5.0.6 had an AFOS bug in it. I'm including the
fixed pqing.c code, this should work for you.
Robb...
On Tue, 2 Mar 1999, David Magee wrote:
>
> I've got a Sun Sparc 2 with a NOAA WeatherWire feed into ttya.
>
> Under ldm 5.0, the system works flawlessly.
>
> Under ldm 5.0.6, no data is seen coming in by LDM.
>
> In ldmd.conf:
> exec "afos -v -b 4800 -p none /dev/ttya"
>
> I had similar problems trying to upgrad from 5.0 to 5.0.2
>
> Any suggestions? Am I stuck at 5.0 ? I believe the problem lies within
> pqing.
>
>
>
> Thanks-
>
> David
>
>
>
>
> --
>
> David K. Magee
> address@hidden
>
>
>
===============================================================================
Robb Kambic Unidata Program Center
Software Engineer III Univ. Corp for Atmospheric Research
address@hidden WWW: http://www.unidata.ucar.edu/
===============================================================================
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: pqing.c,v 1.66 1999/02/09 03:17:09 davis Exp $ */
static char version[] =
"$Revision: 1.66 $ built "__DATE__" "__TIME__;
#include <ldmconfig.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <limits.h> /* PATH_MAX */
#ifndef PATH_MAX
#define PATH_MAX 255
#endif /* !PATH_MAX */
#include <sys/types.h>
#include <sys/time.h>
#include <rpc/rpc.h>
#include <errno.h>
#include "ulog.h"
#include "inetutil.h"
#include "ldm.h"
#include "feed.h"
#include "atofeedt.h"
#include "ldmprint.h"
#include "wmo_message.h"
#include "afos_message.h"
#include "faa604_message.h"
#include "paths.h"
#include "pq.h"
#include "md5.h"
#ifdef NO_ATEXIT
#include "atexit.h"
#endif
#if NET
#define RETRY_DELAY (10) /* delay factor between retries */
#define MAX_RETRIES (30)
#endif /* NET */
/* set by command line */
char *baud = NULL;
char *rawfname = NULL;
char *parity = NULL;
enum { CHK_UNSET, CHK_CHECK, CHK_DONT} chkflag = CHK_UNSET;
static feedtypet feedtype = NONE; /* deduce from av[0] */
static const char *progname = NULL;
static char *logfname = "";
static char feedfname[PATH_MAX];
static char myname[HOSTNAMESIZE];
static const char *pqfname = DEFAULT_QUEUE;
static pqueue *pq = NULL;
static int ifd = -1;
static volatile int done = 0;
static volatile int intr = 0;
static volatile int stats_req = 0;
static void (*prod_stats)(void) = wmo_stats;
static unsigned long ndups = 0;
static MD5_CTX *md5ctxp = NULL;
#if NET
static int port_error = 0; /* indicate sigpipe condition was raised */
# ifndef DEFAULT_RESET_SECS
# define DEFAULT_RESET_SECS 600
# endif
static int reset_secs = DEFAULT_RESET_SECS;
#endif
/*
* called at exit
*/
static void
cleanup(void)
{
unotice("Exiting");
if(!intr)
{
/* We are not in the interrupt context */
if(md5ctxp != NULL)
{
free_MD5_CTX(md5ctxp);
}
if(pq != NULL)
{
off_t highwater = 0;
size_t maxregions = 0;
(void) pq_highwater(pq, &highwater, &maxregions);
(void) pq_close(pq);
pq = NULL;
if(feed_close)
(*feed_close)(ifd);
ifd = -1;
unotice(" Queue usage (bytes):%8ld",
(long)highwater);
unotice(" (nregions):%8ld",
(long)maxregions);
unotice(" Duplicates rejected:%8lu", ndups);
}
(*prod_stats)();
(*feed_stats)();
}
(void) closeulog();
}
/*
* called upon receipt of signals
*/
static void
signal_handler(int sig)
{
#ifdef SVR3SIGNALS
/*
* Some systems reset handler to SIG_DFL upon entry to handler.
* In that case, we reregister our handler.
*/
(void) signal(sig, signal_handler);
#endif
switch(sig) {
case SIGINT :
unotice("Interrupt");
intr = !0;
exit(0);
case SIGTERM :
udebug("SIGTERM");
done = !0;
return;
case SIGPIPE :
#if NET
if(INPUT_IS_SOCKET)
{
unotice("SIGPIPE");
port_error = !0;
}
#endif
return;
case SIGUSR1 :
udebug("SIGUSR1");
stats_req = !0;
return;
case SIGUSR2 :
udebug("SIGUSR2");
if (toggleulogpri(LOG_INFO))
unotice("Going verbose");
else
unotice("Going silent");
return;
}
udebug("signal_handler: unhandled signal: %d", sig);
}
/*
* register the signal_handler
*/
static void
set_sigactions(void)
{
struct sigaction sigact;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
/* Ignore these */
sigact.sa_handler = SIG_IGN;
(void) sigaction(SIGHUP, &sigact, NULL);
(void) sigaction(SIGALRM, &sigact, NULL);
(void) sigaction(SIGCHLD, &sigact, NULL);
/* Handle these */
#ifdef SA_RESTART /* SVR4, 4.3+ BSD */
/* usually, restart system calls */
sigact.sa_flags |= SA_RESTART;
#endif
sigact.sa_handler = signal_handler;
(void) sigaction(SIGTERM, &sigact, NULL);
(void) sigaction(SIGUSR1, &sigact, NULL);
(void) sigaction(SIGUSR2, &sigact, NULL);
/* Don't restart after interrupt */
sigact.sa_flags = 0;
#ifdef SA_INTERRUPT /* SunOS 4.x */
sigact.sa_flags |= SA_INTERRUPT;
#endif
(void) sigaction(SIGINT, &sigact, NULL);
(void) sigaction(SIGPIPE, &sigact, NULL);
}
static void
usage(
char *av0 /* id string */
)
{
(void)fprintf(stderr,
"Usage: %s [options] feedname\t\nOptions:\n", av0);
(void)fprintf(stderr,
"\t-v Verbose, tell me about each product\n");
(void)fprintf(stderr,
"\t-r rawfile Stash 'raw' data in \"rawfile\"\n");
(void)fprintf(stderr,
"\t-l logfile Default logs to syslogd\n");
(void)fprintf(stderr,
"\t-f type Claim to be feedtype \"type\", one of \"hds\",
\"ddplus\", ...\n");
(void)fprintf(stderr,
"\t-b baud Set baudrate for tty feed.\n");
(void)fprintf(stderr,
"\t-q queue default \"%s\"\n", DEFAULT_QUEUE);
(void)fprintf(stderr,
"\t-p [even|odd|none] Set parity for tty feed.\n");
(void)fprintf(stderr,
"\t-c Enable checksum or parity check on non tty
feed\n");
(void)fprintf(stderr,
"\t-n Disable checksum or parity check on tty
feed\n");
#if NET
(void)fprintf(stderr,
"\t-P port Get input via TCP connection to host
\"feedname\" at \"port\"\n");
(void)fprintf(stderr,
"\t-T timeout Idle timeout before TCP reconnect, in
seconds\n");
(void)fprintf(stderr,
"\t (defaults to %d, 0 disables timeout)\n",
DEFAULT_RESET_SECS);
#endif
exit(1);
}
void
toClients(timestampt arrival,
unsigned seqno,
const char *ident,
unsigned len,
const char *buf)
{
static struct product prod;
int status;
prod.info.arrival = arrival;
MD5Init(md5ctxp);
MD5Update(md5ctxp, (const unsigned char *)buf, len);
MD5Final(prod.info.signature, md5ctxp);
prod.info.origin = myname;
prod.info.feedtype = feedtype;
prod.info.seqno = seqno;
prod.info.ident = (char *)ident; /* cast away const */
prod.info.sz = len;
prod.data = (void *)buf; /* cast away const */
if(ulogIsVerbose())
uinfo("%s", s_prod_info(NULL, 0, &prod.info, ulogIsDebug()));
if(pq == NULL) /* if we are "feedtest", do nothing else */
return;
status = pq_insert(pq, &prod);
if(status == ENOERR)
return; /* Normal return */
/* else */
if(status == PQUEUE_DUP)
{
ndups++;
uinfo("Product already in queue");
return;
}
/* else, error */
uerror("pq_insert: %s\n",
status > 0 ? strerror(status) : "Internal error");
exit(1); /* ??? */
}
static void
setFeedDefaults(feedtypet type)
{
/* set up defaults for feed according to type */
switch (type) {
case DDPLUS :
baud = "19200";
parity = "even";
break;
case PPS :
case DDS :
case IDS :
baud = "9600";
parity = "even";
break;
case HDS :
baud = "19200";
parity = "none";
break;
case AFOS :
baud = "4800"; /* ??? */
parity = "none";
break;
case FAA604 :
baud = "1200";
parity = "even";
break;
}
}
static feedtypet
whatami(const char *av0)
{
feedtypet type;
#define SEP '/' /* separates components of path */
/* strip off leading path */
if ((progname = strrchr(av0, SEP)) == NULL)
progname = av0;
else
progname++;
type = atofeedtypet(progname);
if(type == NONE)
type = WMO; /* default for wmo ingestd */
setFeedDefaults(type);
return type;
}
int
main(int ac, char *av[])
{
int logfd;
int width;
int ready;
unsigned long idle;
fd_set readfds;
fd_set exceptfds;
struct timeval timeo;
feedtype = whatami(av[0]);
/*
* Check the environment for some options.
* May be overridden by command line switches below.
*/
{
const char *ldmpqfname = getenv("LDMPQFNAME");
if(ldmpqfname != NULL)
pqfname = ldmpqfname;
}
{
extern int optind;
extern int opterr;
extern char *optarg;
int ch;
int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE));
opterr = 1;
while ((ch = getopt(ac, av, "vxcnl:b:p:P:T:q:r:f:")) != EOF)
switch (ch) {
case 'v':
logmask |= LOG_MASK(LOG_INFO);
break;
case 'x':
logmask |= LOG_MASK(LOG_DEBUG);
break;
case 'c':
chkflag = CHK_CHECK;
break;
case 'n':
chkflag = CHK_DONT;
break;
case 'l':
if(optarg[0] == '-' && optarg[1] != 0)
{
fprintf(stderr, "logfile \"%s\" ??\n",
optarg);
usage(av[0]);
}
/* else */
logfname = optarg;
break;
case 'b':
baud = optarg;
break;
case 'p':
parity = optarg;
break;
#if NET
case 'P':
*((int *)&server_port) = atoi(optarg); /* cast away
const */
if(server_port <= 0 || server_port > 65536)
{
fprintf(stderr, "invalid server_port %s\n",
optarg);
usage(av[0]);
}
break;
case 'T':
reset_secs = atoi(optarg);
if(reset_secs < 0)
{
fprintf(stderr, "invalid timeout %s\n",
optarg);
usage(av[0]);
}
break;
#endif /* NET */
case 'q':
pqfname = optarg;
break;
case 'r':
rawfname = optarg;
break;
case 'f':
{
feedtypet type;
type = atofeedtypet(optarg);
if(type != NONE)
{
feedtype = type;
if(!parity && !baud)
setFeedDefaults(type);
}
}
break;
case '?':
usage(av[0]);
break;
}
/* last arg, feedfname, is required */
if(ac - optind <= 0)
usage(av[0]);
strncat(feedfname,av[optind], sizeof(feedfname) - 6 );
(void) setulogmask(logmask);
}
/*
* initialize logger
*/
if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
(void) fclose(stderr);
logfd = openulog(ubasename(av[0]),
(LOG_CONS|LOG_PID), LOG_LDM, logfname);
unotice("Starting Up");
udebug(version);
if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
{
setbuf(fdopen(logfd, "a"), NULL);
}
/*
* register exit handler
*/
if(atexit(cleanup) != 0)
{
serror("atexit");
return 1;
}
/*
* set up signal handlers
*/
set_sigactions();
/*
* open the product queue, unless we were invoked as "feedtest"
*/
if(strcmp(progname, "feedtest") != 0)
{
if(ready = pq_open(pqfname, PQ_DEFAULT, &pq))
{
uerror("pq_open: \"%s\" failed: %s",
pqfname, strerror(ready));
return 1;
}
}
/*
* who am i, anyway
*/
(void) strcpy(myname, ghostname());
/*
* open the feed
*/
if(!(*feedfname == '-' && feedfname[1] == 0) && logfd != 0)
(void) close(0);
if(open_feed(feedfname, &ifd) != ENOERR)
return 1;
if (feedtype & HDS)
{
if(chkflag == CHK_CHECK
|| (isatty(ifd) && chkflag != CHK_DONT))
setTheScanner(scan_wmo_binary_crc);
else
setTheScanner(scan_wmo_binary);
}
else if (feedtype == AFOS)
{
prod_stats = afos_stats;
setTheScanner(scan_afos);
}
else if (feedtype == NMC2) /* CONDUIT */
{
setTheScanner(scan_wmo_binary);
}
else if (feedtype == FAA604)
{
prod_stats = faa604_stats;
if(chkflag == CHK_CHECK
|| (isatty(ifd)
&& chkflag != CHK_DONT
&& parity != NULL
&& *parity != 'n')
)
{
setTheScanner(scan_faa604_parity);
}
else
{
setTheScanner(scan_faa604);
}
}
else
{
if(chkflag == CHK_CHECK
|| (isatty(ifd)
&& chkflag != CHK_DONT
&& parity != NULL
&& *parity != 'n')
)
{
setTheScanner(scan_wmo_parity);
}
else
{
setTheScanner(scan_wmo);
}
}
/*
* Allocate an MD5 context
*/
md5ctxp = new_MD5_CTX();
if(md5ctxp == NULL)
{
serror("new_md5_CTX failed");
return 1;
}
/*
* Main Loop
*/
idle = 0;
while(!done)
{
#if NET
if (INPUT_IS_SOCKET)
{
if (port_error)
{
/*
* lost connection => close
*/
if (ifd >= 0)
{
if(feed_close)
(*feed_close)(ifd);
ifd = -1;
}
port_error = 0;
sleep (2); /* allow things to settle down */
continue;
}
}
#endif
if(stats_req)
{
unotice("Statistics Request");
if(pq != NULL)
{
off_t highwater = 0;
size_t maxregions = 0;
(void) pq_highwater(pq, &highwater,
&maxregions);
unotice(" Queue usage (bytes):%8ld",
(long)highwater);
unotice(" (nregions):%8ld",
(long)maxregions);
}
unotice(" Idle: %8lu seconds", idle);
#if NET
if (INPUT_IS_SOCKET)
{
unotice(" Timeout: %8d", reset_secs);
}
#endif
unotice("%21s: %s", "Status",
(ifd < 0) ?
"Not connected or input not open." :
"Connected.");
(*prod_stats)();
(*feed_stats)();
stats_req = 0;
}
#if NET
if (INPUT_IS_SOCKET)
{
if (ifd < 0)
{
/* Attempt reconnect */
static int retries = 0;
if (retries > MAX_RETRIES)
{
uerror ("maximum retry attempts %d, aborting",
MAX_RETRIES);
done = !0;
continue;
}
/* Try to reopen on tcp read errors */
unotice("Trying to re-open connection on port %d",
server_port);
++retries;
if(open_feed(feedfname, &ifd) != ENOERR)
{
unotice ("sleeping %d seconds before retry %d",
retries * RETRY_DELAY, retries+1);
sleep (retries * RETRY_DELAY);
continue;
}
retries = 0;
}
}
#endif /* NET */
timeo.tv_sec = 3;
timeo.tv_usec = 0;
FD_ZERO(&readfds);
FD_ZERO(&exceptfds);
FD_SET(ifd, &readfds);
FD_SET(ifd, &exceptfds);
width = ifd + 1;
ready = select(width, &readfds, 0, &exceptfds, &timeo);
if(ready < 0 )
{
/* handle EINTR as a special case */
if(errno == EINTR)
{
errno = 0;
continue;
}
serror("select");
return 1;
}
/* else */
#if 0
if (FD_ISSET(ifd, &exceptfds))
{
uerror("Exception on input fd %d, select returned %d",
ifd, ready);
}
#endif
if(ready > 0)
{
/* do some work */
if(FD_ISSET(ifd, &readfds) ||
FD_ISSET(ifd, &exceptfds))
{
idle = 0;
if(feedTheXbuf(ifd) != ENOERR)
{
#if NET
if (INPUT_IS_SOCKET)
{
port_error = !0;
continue;
} /* else */
#endif /* NET */
done = !0;
}
FD_CLR(ifd, &readfds);
FD_CLR(ifd, &exceptfds);
}
else
{
uerror("select returned %d but ifd not set",
ready);
idle += timeo.tv_sec;
}
}
else /* ready == 0 */
{
idle += timeo.tv_sec;
#if NET
if (INPUT_IS_SOCKET)
{
/* VOODOO
* This is necessary to stimulate
* 'Connection reset by peer'
* when the Portmaster goes down and comes
* back up.
*/
static char zed[1] = {0};
if(write(ifd, zed, sizeof(zed)) < 0)
{
port_error = !0;
continue;
}
}
#endif
}
#if NET
if (INPUT_IS_SOCKET)
{
if ((reset_secs > 0) && (idle >= reset_secs))
{
unotice("Idle for %ld seconds, reconnecting",
idle);
/* force reconnect */
port_error = !0;
idle = 0;
continue;
}
}
#endif /* NET */
(void) scanTheXbuf();
}
return 0;
}