This archive contains answers to questions sent to Unidata support through mid-2025. Note that the archive is no longer being updated. We provide the archive for reference; many of the answers presented here remain technically correct, even if somewhat outdated. For the most up-to-date information on the use of NSF Unidata software and data services, please consult the Software Documentation first.
David, The pqing.c shipped with 5.0.6 had an AFOS bug in it. I'm including the fixed pqing.c code, this should work for you. Robb... On Tue, 2 Mar 1999, David Magee wrote: > > I've got a Sun Sparc 2 with a NOAA WeatherWire feed into ttya. > > Under ldm 5.0, the system works flawlessly. > > Under ldm 5.0.6, no data is seen coming in by LDM. > > In ldmd.conf: > exec "afos -v -b 4800 -p none /dev/ttya" > > I had similar problems trying to upgrad from 5.0 to 5.0.2 > > Any suggestions? Am I stuck at 5.0 ? I believe the problem lies within > pqing. > > > > Thanks- > > David > > > > > -- > > David K. Magee > address@hidden > > > =============================================================================== Robb Kambic Unidata Program Center Software Engineer III Univ. Corp for Atmospheric Research address@hidden WWW: http://www.unidata.ucar.edu/ ===============================================================================
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: pqing.c,v 1.66 1999/02/09 03:17:09 davis Exp $ */
static char version[] =
"$Revision: 1.66 $ built "__DATE__" "__TIME__;
#include <ldmconfig.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <limits.h> /* PATH_MAX */
#ifndef PATH_MAX
#define PATH_MAX 255
#endif /* !PATH_MAX */
#include <sys/types.h>
#include <sys/time.h>
#include <rpc/rpc.h>
#include <errno.h>
#include "ulog.h"
#include "inetutil.h"
#include "ldm.h"
#include "feed.h"
#include "atofeedt.h"
#include "ldmprint.h"
#include "wmo_message.h"
#include "afos_message.h"
#include "faa604_message.h"
#include "paths.h"
#include "pq.h"
#include "md5.h"
#ifdef NO_ATEXIT
#include "atexit.h"
#endif
#if NET
#define RETRY_DELAY (10) /* delay factor between retries */
#define MAX_RETRIES (30)
#endif /* NET */
/* set by command line */
char *baud = NULL;
char *rawfname = NULL;
char *parity = NULL;
enum { CHK_UNSET, CHK_CHECK, CHK_DONT} chkflag = CHK_UNSET;
static feedtypet feedtype = NONE; /* deduce from av[0] */
static const char *progname = NULL;
static char *logfname = "";
static char feedfname[PATH_MAX];
static char myname[HOSTNAMESIZE];
static const char *pqfname = DEFAULT_QUEUE;
static pqueue *pq = NULL;
static int ifd = -1;
static volatile int done = 0;
static volatile int intr = 0;
static volatile int stats_req = 0;
static void (*prod_stats)(void) = wmo_stats;
static unsigned long ndups = 0;
static MD5_CTX *md5ctxp = NULL;
#if NET
static int port_error = 0; /* indicate sigpipe condition was raised */
# ifndef DEFAULT_RESET_SECS
# define DEFAULT_RESET_SECS 600
# endif
static int reset_secs = DEFAULT_RESET_SECS;
#endif
/*
* called at exit
*/
static void
cleanup(void)
{
unotice("Exiting");
if(!intr)
{
/* We are not in the interrupt context */
if(md5ctxp != NULL)
{
free_MD5_CTX(md5ctxp);
}
if(pq != NULL)
{
off_t highwater = 0;
size_t maxregions = 0;
(void) pq_highwater(pq, &highwater, &maxregions);
(void) pq_close(pq);
pq = NULL;
if(feed_close)
(*feed_close)(ifd);
ifd = -1;
unotice(" Queue usage (bytes):%8ld",
(long)highwater);
unotice(" (nregions):%8ld",
(long)maxregions);
unotice(" Duplicates rejected:%8lu", ndups);
}
(*prod_stats)();
(*feed_stats)();
}
(void) closeulog();
}
/*
* called upon receipt of signals
*/
static void
signal_handler(int sig)
{
#ifdef SVR3SIGNALS
/*
* Some systems reset handler to SIG_DFL upon entry to handler.
* In that case, we reregister our handler.
*/
(void) signal(sig, signal_handler);
#endif
switch(sig) {
case SIGINT :
unotice("Interrupt");
intr = !0;
exit(0);
case SIGTERM :
udebug("SIGTERM");
done = !0;
return;
case SIGPIPE :
#if NET
if(INPUT_IS_SOCKET)
{
unotice("SIGPIPE");
port_error = !0;
}
#endif
return;
case SIGUSR1 :
udebug("SIGUSR1");
stats_req = !0;
return;
case SIGUSR2 :
udebug("SIGUSR2");
if (toggleulogpri(LOG_INFO))
unotice("Going verbose");
else
unotice("Going silent");
return;
}
udebug("signal_handler: unhandled signal: %d", sig);
}
/*
* register the signal_handler
*/
static void
set_sigactions(void)
{
struct sigaction sigact;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
/* Ignore these */
sigact.sa_handler = SIG_IGN;
(void) sigaction(SIGHUP, &sigact, NULL);
(void) sigaction(SIGALRM, &sigact, NULL);
(void) sigaction(SIGCHLD, &sigact, NULL);
/* Handle these */
#ifdef SA_RESTART /* SVR4, 4.3+ BSD */
/* usually, restart system calls */
sigact.sa_flags |= SA_RESTART;
#endif
sigact.sa_handler = signal_handler;
(void) sigaction(SIGTERM, &sigact, NULL);
(void) sigaction(SIGUSR1, &sigact, NULL);
(void) sigaction(SIGUSR2, &sigact, NULL);
/* Don't restart after interrupt */
sigact.sa_flags = 0;
#ifdef SA_INTERRUPT /* SunOS 4.x */
sigact.sa_flags |= SA_INTERRUPT;
#endif
(void) sigaction(SIGINT, &sigact, NULL);
(void) sigaction(SIGPIPE, &sigact, NULL);
}
static void
usage(
char *av0 /* id string */
)
{
(void)fprintf(stderr,
"Usage: %s [options] feedname\t\nOptions:\n", av0);
(void)fprintf(stderr,
"\t-v Verbose, tell me about each product\n");
(void)fprintf(stderr,
"\t-r rawfile Stash 'raw' data in \"rawfile\"\n");
(void)fprintf(stderr,
"\t-l logfile Default logs to syslogd\n");
(void)fprintf(stderr,
"\t-f type Claim to be feedtype \"type\", one of \"hds\",
\"ddplus\", ...\n");
(void)fprintf(stderr,
"\t-b baud Set baudrate for tty feed.\n");
(void)fprintf(stderr,
"\t-q queue default \"%s\"\n", DEFAULT_QUEUE);
(void)fprintf(stderr,
"\t-p [even|odd|none] Set parity for tty feed.\n");
(void)fprintf(stderr,
"\t-c Enable checksum or parity check on non tty
feed\n");
(void)fprintf(stderr,
"\t-n Disable checksum or parity check on tty
feed\n");
#if NET
(void)fprintf(stderr,
"\t-P port Get input via TCP connection to host
\"feedname\" at \"port\"\n");
(void)fprintf(stderr,
"\t-T timeout Idle timeout before TCP reconnect, in
seconds\n");
(void)fprintf(stderr,
"\t (defaults to %d, 0 disables timeout)\n",
DEFAULT_RESET_SECS);
#endif
exit(1);
}
void
toClients(timestampt arrival,
unsigned seqno,
const char *ident,
unsigned len,
const char *buf)
{
static struct product prod;
int status;
prod.info.arrival = arrival;
MD5Init(md5ctxp);
MD5Update(md5ctxp, (const unsigned char *)buf, len);
MD5Final(prod.info.signature, md5ctxp);
prod.info.origin = myname;
prod.info.feedtype = feedtype;
prod.info.seqno = seqno;
prod.info.ident = (char *)ident; /* cast away const */
prod.info.sz = len;
prod.data = (void *)buf; /* cast away const */
if(ulogIsVerbose())
uinfo("%s", s_prod_info(NULL, 0, &prod.info, ulogIsDebug()));
if(pq == NULL) /* if we are "feedtest", do nothing else */
return;
status = pq_insert(pq, &prod);
if(status == ENOERR)
return; /* Normal return */
/* else */
if(status == PQUEUE_DUP)
{
ndups++;
uinfo("Product already in queue");
return;
}
/* else, error */
uerror("pq_insert: %s\n",
status > 0 ? strerror(status) : "Internal error");
exit(1); /* ??? */
}
static void
setFeedDefaults(feedtypet type)
{
/* set up defaults for feed according to type */
switch (type) {
case DDPLUS :
baud = "19200";
parity = "even";
break;
case PPS :
case DDS :
case IDS :
baud = "9600";
parity = "even";
break;
case HDS :
baud = "19200";
parity = "none";
break;
case AFOS :
baud = "4800"; /* ??? */
parity = "none";
break;
case FAA604 :
baud = "1200";
parity = "even";
break;
}
}
static feedtypet
whatami(const char *av0)
{
feedtypet type;
#define SEP '/' /* separates components of path */
/* strip off leading path */
if ((progname = strrchr(av0, SEP)) == NULL)
progname = av0;
else
progname++;
type = atofeedtypet(progname);
if(type == NONE)
type = WMO; /* default for wmo ingestd */
setFeedDefaults(type);
return type;
}
int
main(int ac, char *av[])
{
int logfd;
int width;
int ready;
unsigned long idle;
fd_set readfds;
fd_set exceptfds;
struct timeval timeo;
feedtype = whatami(av[0]);
/*
* Check the environment for some options.
* May be overridden by command line switches below.
*/
{
const char *ldmpqfname = getenv("LDMPQFNAME");
if(ldmpqfname != NULL)
pqfname = ldmpqfname;
}
{
extern int optind;
extern int opterr;
extern char *optarg;
int ch;
int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE));
opterr = 1;
while ((ch = getopt(ac, av, "vxcnl:b:p:P:T:q:r:f:")) != EOF)
switch (ch) {
case 'v':
logmask |= LOG_MASK(LOG_INFO);
break;
case 'x':
logmask |= LOG_MASK(LOG_DEBUG);
break;
case 'c':
chkflag = CHK_CHECK;
break;
case 'n':
chkflag = CHK_DONT;
break;
case 'l':
if(optarg[0] == '-' && optarg[1] != 0)
{
fprintf(stderr, "logfile \"%s\" ??\n",
optarg);
usage(av[0]);
}
/* else */
logfname = optarg;
break;
case 'b':
baud = optarg;
break;
case 'p':
parity = optarg;
break;
#if NET
case 'P':
*((int *)&server_port) = atoi(optarg); /* cast away
const */
if(server_port <= 0 || server_port > 65536)
{
fprintf(stderr, "invalid server_port %s\n",
optarg);
usage(av[0]);
}
break;
case 'T':
reset_secs = atoi(optarg);
if(reset_secs < 0)
{
fprintf(stderr, "invalid timeout %s\n",
optarg);
usage(av[0]);
}
break;
#endif /* NET */
case 'q':
pqfname = optarg;
break;
case 'r':
rawfname = optarg;
break;
case 'f':
{
feedtypet type;
type = atofeedtypet(optarg);
if(type != NONE)
{
feedtype = type;
if(!parity && !baud)
setFeedDefaults(type);
}
}
break;
case '?':
usage(av[0]);
break;
}
/* last arg, feedfname, is required */
if(ac - optind <= 0)
usage(av[0]);
strncat(feedfname,av[optind], sizeof(feedfname) - 6 );
(void) setulogmask(logmask);
}
/*
* initialize logger
*/
if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
(void) fclose(stderr);
logfd = openulog(ubasename(av[0]),
(LOG_CONS|LOG_PID), LOG_LDM, logfname);
unotice("Starting Up");
udebug(version);
if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0))
{
setbuf(fdopen(logfd, "a"), NULL);
}
/*
* register exit handler
*/
if(atexit(cleanup) != 0)
{
serror("atexit");
return 1;
}
/*
* set up signal handlers
*/
set_sigactions();
/*
* open the product queue, unless we were invoked as "feedtest"
*/
if(strcmp(progname, "feedtest") != 0)
{
if(ready = pq_open(pqfname, PQ_DEFAULT, &pq))
{
uerror("pq_open: \"%s\" failed: %s",
pqfname, strerror(ready));
return 1;
}
}
/*
* who am i, anyway
*/
(void) strcpy(myname, ghostname());
/*
* open the feed
*/
if(!(*feedfname == '-' && feedfname[1] == 0) && logfd != 0)
(void) close(0);
if(open_feed(feedfname, &ifd) != ENOERR)
return 1;
if (feedtype & HDS)
{
if(chkflag == CHK_CHECK
|| (isatty(ifd) && chkflag != CHK_DONT))
setTheScanner(scan_wmo_binary_crc);
else
setTheScanner(scan_wmo_binary);
}
else if (feedtype == AFOS)
{
prod_stats = afos_stats;
setTheScanner(scan_afos);
}
else if (feedtype == NMC2) /* CONDUIT */
{
setTheScanner(scan_wmo_binary);
}
else if (feedtype == FAA604)
{
prod_stats = faa604_stats;
if(chkflag == CHK_CHECK
|| (isatty(ifd)
&& chkflag != CHK_DONT
&& parity != NULL
&& *parity != 'n')
)
{
setTheScanner(scan_faa604_parity);
}
else
{
setTheScanner(scan_faa604);
}
}
else
{
if(chkflag == CHK_CHECK
|| (isatty(ifd)
&& chkflag != CHK_DONT
&& parity != NULL
&& *parity != 'n')
)
{
setTheScanner(scan_wmo_parity);
}
else
{
setTheScanner(scan_wmo);
}
}
/*
* Allocate an MD5 context
*/
md5ctxp = new_MD5_CTX();
if(md5ctxp == NULL)
{
serror("new_md5_CTX failed");
return 1;
}
/*
* Main Loop
*/
idle = 0;
while(!done)
{
#if NET
if (INPUT_IS_SOCKET)
{
if (port_error)
{
/*
* lost connection => close
*/
if (ifd >= 0)
{
if(feed_close)
(*feed_close)(ifd);
ifd = -1;
}
port_error = 0;
sleep (2); /* allow things to settle down */
continue;
}
}
#endif
if(stats_req)
{
unotice("Statistics Request");
if(pq != NULL)
{
off_t highwater = 0;
size_t maxregions = 0;
(void) pq_highwater(pq, &highwater,
&maxregions);
unotice(" Queue usage (bytes):%8ld",
(long)highwater);
unotice(" (nregions):%8ld",
(long)maxregions);
}
unotice(" Idle: %8lu seconds", idle);
#if NET
if (INPUT_IS_SOCKET)
{
unotice(" Timeout: %8d", reset_secs);
}
#endif
unotice("%21s: %s", "Status",
(ifd < 0) ?
"Not connected or input not open." :
"Connected.");
(*prod_stats)();
(*feed_stats)();
stats_req = 0;
}
#if NET
if (INPUT_IS_SOCKET)
{
if (ifd < 0)
{
/* Attempt reconnect */
static int retries = 0;
if (retries > MAX_RETRIES)
{
uerror ("maximum retry attempts %d, aborting",
MAX_RETRIES);
done = !0;
continue;
}
/* Try to reopen on tcp read errors */
unotice("Trying to re-open connection on port %d",
server_port);
++retries;
if(open_feed(feedfname, &ifd) != ENOERR)
{
unotice ("sleeping %d seconds before retry %d",
retries * RETRY_DELAY, retries+1);
sleep (retries * RETRY_DELAY);
continue;
}
retries = 0;
}
}
#endif /* NET */
timeo.tv_sec = 3;
timeo.tv_usec = 0;
FD_ZERO(&readfds);
FD_ZERO(&exceptfds);
FD_SET(ifd, &readfds);
FD_SET(ifd, &exceptfds);
width = ifd + 1;
ready = select(width, &readfds, 0, &exceptfds, &timeo);
if(ready < 0 )
{
/* handle EINTR as a special case */
if(errno == EINTR)
{
errno = 0;
continue;
}
serror("select");
return 1;
}
/* else */
#if 0
if (FD_ISSET(ifd, &exceptfds))
{
uerror("Exception on input fd %d, select returned %d",
ifd, ready);
}
#endif
if(ready > 0)
{
/* do some work */
if(FD_ISSET(ifd, &readfds) ||
FD_ISSET(ifd, &exceptfds))
{
idle = 0;
if(feedTheXbuf(ifd) != ENOERR)
{
#if NET
if (INPUT_IS_SOCKET)
{
port_error = !0;
continue;
} /* else */
#endif /* NET */
done = !0;
}
FD_CLR(ifd, &readfds);
FD_CLR(ifd, &exceptfds);
}
else
{
uerror("select returned %d but ifd not set",
ready);
idle += timeo.tv_sec;
}
}
else /* ready == 0 */
{
idle += timeo.tv_sec;
#if NET
if (INPUT_IS_SOCKET)
{
/* VOODOO
* This is necessary to stimulate
* 'Connection reset by peer'
* when the Portmaster goes down and comes
* back up.
*/
static char zed[1] = {0};
if(write(ifd, zed, sizeof(zed)) < 0)
{
port_error = !0;
continue;
}
}
#endif
}
#if NET
if (INPUT_IS_SOCKET)
{
if ((reset_secs > 0) && (idle >= reset_secs))
{
unotice("Idle for %ld seconds, reconnecting",
idle);
/* force reconnect */
port_error = !0;
idle = 0;
continue;
}
}
#endif /* NET */
(void) scanTheXbuf();
}
return 0;
}