This archive contains answers to questions sent to Unidata support through mid-2025. Note that the archive is no longer being updated. We provide the archive for reference; many of the answers presented here remain technically correct, even if somewhat outdated. For the most up-to-date information on the use of NSF Unidata software and data services, please consult the Software Documentation first.
David, The pqing.c shipped with 5.0.6 had an AFOS bug in it. I'm including the fixed pqing.c code, this should work for you. Robb... On Tue, 2 Mar 1999, David Magee wrote: > > I've got a Sun Sparc 2 with a NOAA WeatherWire feed into ttya. > > Under ldm 5.0, the system works flawlessly. > > Under ldm 5.0.6, no data is seen coming in by LDM. > > In ldmd.conf: > exec "afos -v -b 4800 -p none /dev/ttya" > > I had similar problems trying to upgrad from 5.0 to 5.0.2 > > Any suggestions? Am I stuck at 5.0 ? I believe the problem lies within > pqing. > > > > Thanks- > > David > > > > > -- > > David K. Magee > address@hidden > > > =============================================================================== Robb Kambic Unidata Program Center Software Engineer III Univ. Corp for Atmospheric Research address@hidden WWW: http://www.unidata.ucar.edu/ ===============================================================================
/* * Copyright 1993, University Corporation for Atmospheric Research * See ../COPYRIGHT file for copying and redistribution conditions. */ /* $Id: pqing.c,v 1.66 1999/02/09 03:17:09 davis Exp $ */ static char version[] = "$Revision: 1.66 $ built "__DATE__" "__TIME__; #include <ldmconfig.h> #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <string.h> #include <unistd.h> #include <limits.h> /* PATH_MAX */ #ifndef PATH_MAX #define PATH_MAX 255 #endif /* !PATH_MAX */ #include <sys/types.h> #include <sys/time.h> #include <rpc/rpc.h> #include <errno.h> #include "ulog.h" #include "inetutil.h" #include "ldm.h" #include "feed.h" #include "atofeedt.h" #include "ldmprint.h" #include "wmo_message.h" #include "afos_message.h" #include "faa604_message.h" #include "paths.h" #include "pq.h" #include "md5.h" #ifdef NO_ATEXIT #include "atexit.h" #endif #if NET #define RETRY_DELAY (10) /* delay factor between retries */ #define MAX_RETRIES (30) #endif /* NET */ /* set by command line */ char *baud = NULL; char *rawfname = NULL; char *parity = NULL; enum { CHK_UNSET, CHK_CHECK, CHK_DONT} chkflag = CHK_UNSET; static feedtypet feedtype = NONE; /* deduce from av[0] */ static const char *progname = NULL; static char *logfname = ""; static char feedfname[PATH_MAX]; static char myname[HOSTNAMESIZE]; static const char *pqfname = DEFAULT_QUEUE; static pqueue *pq = NULL; static int ifd = -1; static volatile int done = 0; static volatile int intr = 0; static volatile int stats_req = 0; static void (*prod_stats)(void) = wmo_stats; static unsigned long ndups = 0; static MD5_CTX *md5ctxp = NULL; #if NET static int port_error = 0; /* indicate sigpipe condition was raised */ # ifndef DEFAULT_RESET_SECS # define DEFAULT_RESET_SECS 600 # endif static int reset_secs = DEFAULT_RESET_SECS; #endif /* * called at exit */ static void cleanup(void) { unotice("Exiting"); if(!intr) { /* We are not in the interrupt context */ if(md5ctxp != NULL) { free_MD5_CTX(md5ctxp); } if(pq != NULL) { off_t highwater = 0; size_t maxregions = 0; (void) pq_highwater(pq, &highwater, &maxregions); (void) pq_close(pq); pq = NULL; if(feed_close) (*feed_close)(ifd); ifd = -1; unotice(" Queue usage (bytes):%8ld", (long)highwater); unotice(" (nregions):%8ld", (long)maxregions); unotice(" Duplicates rejected:%8lu", ndups); } (*prod_stats)(); (*feed_stats)(); } (void) closeulog(); } /* * called upon receipt of signals */ static void signal_handler(int sig) { #ifdef SVR3SIGNALS /* * Some systems reset handler to SIG_DFL upon entry to handler. * In that case, we reregister our handler. */ (void) signal(sig, signal_handler); #endif switch(sig) { case SIGINT : unotice("Interrupt"); intr = !0; exit(0); case SIGTERM : udebug("SIGTERM"); done = !0; return; case SIGPIPE : #if NET if(INPUT_IS_SOCKET) { unotice("SIGPIPE"); port_error = !0; } #endif return; case SIGUSR1 : udebug("SIGUSR1"); stats_req = !0; return; case SIGUSR2 : udebug("SIGUSR2"); if (toggleulogpri(LOG_INFO)) unotice("Going verbose"); else unotice("Going silent"); return; } udebug("signal_handler: unhandled signal: %d", sig); } /* * register the signal_handler */ static void set_sigactions(void) { struct sigaction sigact; sigemptyset(&sigact.sa_mask); sigact.sa_flags = 0; /* Ignore these */ sigact.sa_handler = SIG_IGN; (void) sigaction(SIGHUP, &sigact, NULL); (void) sigaction(SIGALRM, &sigact, NULL); (void) sigaction(SIGCHLD, &sigact, NULL); /* Handle these */ #ifdef SA_RESTART /* SVR4, 4.3+ BSD */ /* usually, restart system calls */ sigact.sa_flags |= SA_RESTART; #endif sigact.sa_handler = signal_handler; (void) sigaction(SIGTERM, &sigact, NULL); (void) sigaction(SIGUSR1, &sigact, NULL); (void) sigaction(SIGUSR2, &sigact, NULL); /* Don't restart after interrupt */ sigact.sa_flags = 0; #ifdef SA_INTERRUPT /* SunOS 4.x */ sigact.sa_flags |= SA_INTERRUPT; #endif (void) sigaction(SIGINT, &sigact, NULL); (void) sigaction(SIGPIPE, &sigact, NULL); } static void usage( char *av0 /* id string */ ) { (void)fprintf(stderr, "Usage: %s [options] feedname\t\nOptions:\n", av0); (void)fprintf(stderr, "\t-v Verbose, tell me about each product\n"); (void)fprintf(stderr, "\t-r rawfile Stash 'raw' data in \"rawfile\"\n"); (void)fprintf(stderr, "\t-l logfile Default logs to syslogd\n"); (void)fprintf(stderr, "\t-f type Claim to be feedtype \"type\", one of \"hds\", \"ddplus\", ...\n"); (void)fprintf(stderr, "\t-b baud Set baudrate for tty feed.\n"); (void)fprintf(stderr, "\t-q queue default \"%s\"\n", DEFAULT_QUEUE); (void)fprintf(stderr, "\t-p [even|odd|none] Set parity for tty feed.\n"); (void)fprintf(stderr, "\t-c Enable checksum or parity check on non tty feed\n"); (void)fprintf(stderr, "\t-n Disable checksum or parity check on tty feed\n"); #if NET (void)fprintf(stderr, "\t-P port Get input via TCP connection to host \"feedname\" at \"port\"\n"); (void)fprintf(stderr, "\t-T timeout Idle timeout before TCP reconnect, in seconds\n"); (void)fprintf(stderr, "\t (defaults to %d, 0 disables timeout)\n", DEFAULT_RESET_SECS); #endif exit(1); } void toClients(timestampt arrival, unsigned seqno, const char *ident, unsigned len, const char *buf) { static struct product prod; int status; prod.info.arrival = arrival; MD5Init(md5ctxp); MD5Update(md5ctxp, (const unsigned char *)buf, len); MD5Final(prod.info.signature, md5ctxp); prod.info.origin = myname; prod.info.feedtype = feedtype; prod.info.seqno = seqno; prod.info.ident = (char *)ident; /* cast away const */ prod.info.sz = len; prod.data = (void *)buf; /* cast away const */ if(ulogIsVerbose()) uinfo("%s", s_prod_info(NULL, 0, &prod.info, ulogIsDebug())); if(pq == NULL) /* if we are "feedtest", do nothing else */ return; status = pq_insert(pq, &prod); if(status == ENOERR) return; /* Normal return */ /* else */ if(status == PQUEUE_DUP) { ndups++; uinfo("Product already in queue"); return; } /* else, error */ uerror("pq_insert: %s\n", status > 0 ? strerror(status) : "Internal error"); exit(1); /* ??? */ } static void setFeedDefaults(feedtypet type) { /* set up defaults for feed according to type */ switch (type) { case DDPLUS : baud = "19200"; parity = "even"; break; case PPS : case DDS : case IDS : baud = "9600"; parity = "even"; break; case HDS : baud = "19200"; parity = "none"; break; case AFOS : baud = "4800"; /* ??? */ parity = "none"; break; case FAA604 : baud = "1200"; parity = "even"; break; } } static feedtypet whatami(const char *av0) { feedtypet type; #define SEP '/' /* separates components of path */ /* strip off leading path */ if ((progname = strrchr(av0, SEP)) == NULL) progname = av0; else progname++; type = atofeedtypet(progname); if(type == NONE) type = WMO; /* default for wmo ingestd */ setFeedDefaults(type); return type; } int main(int ac, char *av[]) { int logfd; int width; int ready; unsigned long idle; fd_set readfds; fd_set exceptfds; struct timeval timeo; feedtype = whatami(av[0]); /* * Check the environment for some options. * May be overridden by command line switches below. */ { const char *ldmpqfname = getenv("LDMPQFNAME"); if(ldmpqfname != NULL) pqfname = ldmpqfname; } { extern int optind; extern int opterr; extern char *optarg; int ch; int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE)); opterr = 1; while ((ch = getopt(ac, av, "vxcnl:b:p:P:T:q:r:f:")) != EOF) switch (ch) { case 'v': logmask |= LOG_MASK(LOG_INFO); break; case 'x': logmask |= LOG_MASK(LOG_DEBUG); break; case 'c': chkflag = CHK_CHECK; break; case 'n': chkflag = CHK_DONT; break; case 'l': if(optarg[0] == '-' && optarg[1] != 0) { fprintf(stderr, "logfile \"%s\" ??\n", optarg); usage(av[0]); } /* else */ logfname = optarg; break; case 'b': baud = optarg; break; case 'p': parity = optarg; break; #if NET case 'P': *((int *)&server_port) = atoi(optarg); /* cast away const */ if(server_port <= 0 || server_port > 65536) { fprintf(stderr, "invalid server_port %s\n", optarg); usage(av[0]); } break; case 'T': reset_secs = atoi(optarg); if(reset_secs < 0) { fprintf(stderr, "invalid timeout %s\n", optarg); usage(av[0]); } break; #endif /* NET */ case 'q': pqfname = optarg; break; case 'r': rawfname = optarg; break; case 'f': { feedtypet type; type = atofeedtypet(optarg); if(type != NONE) { feedtype = type; if(!parity && !baud) setFeedDefaults(type); } } break; case '?': usage(av[0]); break; } /* last arg, feedfname, is required */ if(ac - optind <= 0) usage(av[0]); strncat(feedfname,av[optind], sizeof(feedfname) - 6 ); (void) setulogmask(logmask); } /* * initialize logger */ if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0)) (void) fclose(stderr); logfd = openulog(ubasename(av[0]), (LOG_CONS|LOG_PID), LOG_LDM, logfname); unotice("Starting Up"); udebug(version); if(logfname == NULL || !(*logfname == '-' && logfname[1] == 0)) { setbuf(fdopen(logfd, "a"), NULL); } /* * register exit handler */ if(atexit(cleanup) != 0) { serror("atexit"); return 1; } /* * set up signal handlers */ set_sigactions(); /* * open the product queue, unless we were invoked as "feedtest" */ if(strcmp(progname, "feedtest") != 0) { if(ready = pq_open(pqfname, PQ_DEFAULT, &pq)) { uerror("pq_open: \"%s\" failed: %s", pqfname, strerror(ready)); return 1; } } /* * who am i, anyway */ (void) strcpy(myname, ghostname()); /* * open the feed */ if(!(*feedfname == '-' && feedfname[1] == 0) && logfd != 0) (void) close(0); if(open_feed(feedfname, &ifd) != ENOERR) return 1; if (feedtype & HDS) { if(chkflag == CHK_CHECK || (isatty(ifd) && chkflag != CHK_DONT)) setTheScanner(scan_wmo_binary_crc); else setTheScanner(scan_wmo_binary); } else if (feedtype == AFOS) { prod_stats = afos_stats; setTheScanner(scan_afos); } else if (feedtype == NMC2) /* CONDUIT */ { setTheScanner(scan_wmo_binary); } else if (feedtype == FAA604) { prod_stats = faa604_stats; if(chkflag == CHK_CHECK || (isatty(ifd) && chkflag != CHK_DONT && parity != NULL && *parity != 'n') ) { setTheScanner(scan_faa604_parity); } else { setTheScanner(scan_faa604); } } else { if(chkflag == CHK_CHECK || (isatty(ifd) && chkflag != CHK_DONT && parity != NULL && *parity != 'n') ) { setTheScanner(scan_wmo_parity); } else { setTheScanner(scan_wmo); } } /* * Allocate an MD5 context */ md5ctxp = new_MD5_CTX(); if(md5ctxp == NULL) { serror("new_md5_CTX failed"); return 1; } /* * Main Loop */ idle = 0; while(!done) { #if NET if (INPUT_IS_SOCKET) { if (port_error) { /* * lost connection => close */ if (ifd >= 0) { if(feed_close) (*feed_close)(ifd); ifd = -1; } port_error = 0; sleep (2); /* allow things to settle down */ continue; } } #endif if(stats_req) { unotice("Statistics Request"); if(pq != NULL) { off_t highwater = 0; size_t maxregions = 0; (void) pq_highwater(pq, &highwater, &maxregions); unotice(" Queue usage (bytes):%8ld", (long)highwater); unotice(" (nregions):%8ld", (long)maxregions); } unotice(" Idle: %8lu seconds", idle); #if NET if (INPUT_IS_SOCKET) { unotice(" Timeout: %8d", reset_secs); } #endif unotice("%21s: %s", "Status", (ifd < 0) ? "Not connected or input not open." : "Connected."); (*prod_stats)(); (*feed_stats)(); stats_req = 0; } #if NET if (INPUT_IS_SOCKET) { if (ifd < 0) { /* Attempt reconnect */ static int retries = 0; if (retries > MAX_RETRIES) { uerror ("maximum retry attempts %d, aborting", MAX_RETRIES); done = !0; continue; } /* Try to reopen on tcp read errors */ unotice("Trying to re-open connection on port %d", server_port); ++retries; if(open_feed(feedfname, &ifd) != ENOERR) { unotice ("sleeping %d seconds before retry %d", retries * RETRY_DELAY, retries+1); sleep (retries * RETRY_DELAY); continue; } retries = 0; } } #endif /* NET */ timeo.tv_sec = 3; timeo.tv_usec = 0; FD_ZERO(&readfds); FD_ZERO(&exceptfds); FD_SET(ifd, &readfds); FD_SET(ifd, &exceptfds); width = ifd + 1; ready = select(width, &readfds, 0, &exceptfds, &timeo); if(ready < 0 ) { /* handle EINTR as a special case */ if(errno == EINTR) { errno = 0; continue; } serror("select"); return 1; } /* else */ #if 0 if (FD_ISSET(ifd, &exceptfds)) { uerror("Exception on input fd %d, select returned %d", ifd, ready); } #endif if(ready > 0) { /* do some work */ if(FD_ISSET(ifd, &readfds) || FD_ISSET(ifd, &exceptfds)) { idle = 0; if(feedTheXbuf(ifd) != ENOERR) { #if NET if (INPUT_IS_SOCKET) { port_error = !0; continue; } /* else */ #endif /* NET */ done = !0; } FD_CLR(ifd, &readfds); FD_CLR(ifd, &exceptfds); } else { uerror("select returned %d but ifd not set", ready); idle += timeo.tv_sec; } } else /* ready == 0 */ { idle += timeo.tv_sec; #if NET if (INPUT_IS_SOCKET) { /* VOODOO * This is necessary to stimulate * 'Connection reset by peer' * when the Portmaster goes down and comes * back up. */ static char zed[1] = {0}; if(write(ifd, zed, sizeof(zed)) < 0) { port_error = !0; continue; } } #endif } #if NET if (INPUT_IS_SOCKET) { if ((reset_secs > 0) && (idle >= reset_secs)) { unotice("Idle for %ld seconds, reconnecting", idle); /* force reconnect */ port_error = !0; idle = 0; continue; } } #endif /* NET */ (void) scanTheXbuf(); } return 0; }