This archive contains answers to questions sent to Unidata support through mid-2025. Note that the archive is no longer being updated. We provide the archive for reference; many of the answers presented here remain technically correct, even if somewhat outdated. For the most up-to-date information on the use of NSF Unidata software and data services, please consult the Software Documentation first.
> >To: address@hidden > >From: Craig Searcy <address@hidden> > >Subject: ldm/pq question > >Organization: . > >Keywords: 200003031852.LAA04400 > > Hi, > I have just started using the LDM on our LDAD at the Anchorage forecast > office. I've looked through manuals and man pages and can't see any > obvious answer to my question. > > I would like to know if it is possible to scan a queue for a product and > determine the originating host that the product came from. We have a > network with several sites running ldm with the same or similar product > feeds and I would like to distinguish data from each host. > > Any help or pointers with this would be appreciated. Thanks in advance, > > Craig Searcy > Computer Specialist > NWS Anchorage Forecast Office Hi Craig, In response to your question (and for our own purposes) we've added an option to the pqcat program to display a product's origin. pqcat now takes a -O option, valid only with the -v option, that will display a product's originating host. I have attached a new pqcat.c to this message for you. If you have a source distribution, just replace the pqcat.c file in the pqcat directory with this new file, then execute 'make pqcat'. If you don't have a source distribution, you'll need to wait a bit - we'll be making a new source distribution relatively soon, and a new binary distribution in a few weeks. Or, of course, you can grab an old source distribution if you want the change ASAP. Anne -- *************************************************** Anne Wilson UCAR Unidata Program address@hidden P.O. Box 3000 Boulder, CO 80307 ---------------------------------------------------- Unidata WWW server http://www.unidata.ucar.edu/ ****************************************************
/* * Copyright 1993, University Corporation for Atmospheric Research * See ../COPYRIGHT file for copying and redistribution conditions. */ /* $Id: pqcat.c,v 1.34 2000/03/09 20:17:24 anne Exp $ */ /* * dump a pq */ #include <ldmconfig.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <rpc/rpc.h> #include <signal.h> #include <unistd.h> #include <errno.h> #include <assert.h> #include <regex.h> #include "ldm.h" #include "atofeedt.h" #include "ldmprint.h" #include "ulog.h" #include "pq.h" #include "paths.h" #include "md5.h" #ifdef NO_ATEXIT #include "atexit.h" #endif /* default "one trip" */ #ifndef DEFAULT_INTERVAL #define DEFAULT_INTERVAL 0 #endif #ifndef DEFAULT_FEEDTYPE #define DEFAULT_FEEDTYPE ANY #endif static volatile int done = 0; static volatile int intr = 0; static volatile int stats_req = 0; static int showProdOrigin = 0; static const char *pqfname = DEFAULT_QUEUE; static pqueue *pq = NULL; static int nprods = 0; static MD5_CTX *md5ctxp = NULL; static void dump_stats(void) { unotice("Number of products %d", nprods); } /* */ /*ARGSUSED*/ static int writeprod(const prod_info *infop, const void *datap, void *xprod, size_t size, void *notused) { if(ulogIsVerbose()) { if (showProdOrigin) uinfo("%s %s", s_prod_info(NULL, 0, infop, ulogIsDebug()), infop->origin); else uinfo("%s", s_prod_info(NULL, 0, infop, ulogIsDebug())); } if(md5ctxp != NULL) /* -c option */ { signaturet check; MD5Init(md5ctxp); MD5Update(md5ctxp, datap, infop->sz); MD5Final(check, md5ctxp); if(memcmp(infop->signature, check, sizeof(signaturet)) != 0) { char sb[33]; char cb[33]; uerror("signature mismatch: %s != %s", s_signaturet(sb, sizeof(sb), infop->signature), s_signaturet(cb, sizeof(cb), check)); } } if( write(STDOUT_FILENO, datap, infop->sz) != infop->sz) { int errnum = errno; serror( "data write failed") ; return errnum; } nprods++; return 0; } static void usage(const char *av0) /* id string */ { (void)fprintf(stderr, "Usage: %s [options] [outputfile]\n\tOptions:\n", av0); (void)fprintf(stderr, "\t-v Verbose, tell me about each product\n"); (void)fprintf(stderr, "\t-l logfile Log to a file rather than stderr\n"); (void)fprintf(stderr, "\t-f feedtype Scan for data of type \"feedtype\" (default \"%s\")\n", s_feedtypet(DEFAULT_FEEDTYPE)); (void)fprintf(stderr, "\t-p pattern Interested in products matching \"pattern\" (default \".*\")\n") ; (void)fprintf(stderr, "\t-q pqfname (default \"%s\")\n", DEFAULT_QUEUE); (void)fprintf(stderr, "\t-o offset Set the \"from\" time \"offset\" secs before now\n"); (void)fprintf(stderr, "\t (default \"from\" the beginning of the epoch)\n"); (void)fprintf(stderr, "\t-i interval Poll queue after \"interval\" secs (default %d)\n", DEFAULT_INTERVAL); (void)fprintf(stderr, "\t (\"interval\" of 0 means exit at end of queue)\n"); (void)fprintf(stderr, "\t-c Check, verify MD5 signature\n"); (void)fprintf(stderr, "\t-O Include product origin in verbose output\n"); (void)fprintf(stderr, "\t (valid only with -v option)\n"); (void)fprintf(stderr, "Output defaults to standard output\n"); exit(1); } static void cleanup(void) { unotice("Exiting"); if(!intr) { if(md5ctxp != NULL) free_MD5_CTX(md5ctxp); if(pq != NULL) (void)pq_close(pq); } dump_stats(); (void) closeulog(); } static void signal_handler(int sig) { #ifdef SVR3SIGNALS /* * Some systems reset handler to SIG_DFL upon entry to handler. * In that case, we reregister our handler. */ (void) signal(sig, signal_handler); #endif switch(sig) { case SIGINT : unotice("Interrupt"); intr = !0; exit(0); case SIGTERM : udebug("SIGTERM"); done = !0; return; case SIGUSR1 : udebug("SIGUSR1"); stats_req = !0; return; case SIGUSR2 : udebug("SIGUSR2"); rollulogpri(); return; } udebug("signal_handler: unhandled signal: %d", sig); } /* * register the signal_handler */ static void set_sigactions(void) { struct sigaction sigact; sigemptyset(&sigact.sa_mask); sigact.sa_flags = 0; /* Ignore these */ sigact.sa_handler = SIG_IGN; (void) sigaction(SIGHUP, &sigact, NULL); (void) sigaction(SIGPIPE, &sigact, NULL); (void) sigaction(SIGALRM, &sigact, NULL); (void) sigaction(SIGCHLD, &sigact, NULL); /* Handle these */ #ifdef SA_RESTART /* SVR4, 4.3+ BSD */ /* usually, restart system calls */ sigact.sa_flags |= SA_RESTART; #endif sigact.sa_handler = signal_handler; (void) sigaction(SIGTERM, &sigact, NULL); (void) sigaction(SIGUSR1, &sigact, NULL); (void) sigaction(SIGUSR2, &sigact, NULL); /* Don't restart after interrupt */ sigact.sa_flags = 0; #ifdef SA_INTERRUPT /* SunOS 4.x */ sigact.sa_flags |= SA_INTERRUPT; #endif (void) sigaction(SIGINT, &sigact, NULL); } main(int ac, char *av[]) { const char *progname = ubasename(av[0]); char *logfname; prod_class clss; prod_spec spec; int status = 0; int interval = DEFAULT_INTERVAL; int logoptions = (LOG_CONS|LOG_PID) ; logfname = ""; if(isatty(fileno(stderr))) { /* set interactive defaults */ logfname = "-" ; logoptions = 0 ; } clss.from = TS_ZERO; /* default dump the whole file */ clss.to = TS_ENDT; clss.psa.psa_len = 1; clss.psa.psa_val = &spec; spec.feedtype = ANY; spec.pattern = ".*"; /* * Check the environment for some options. * May be overridden by command line switches below. */ { const char *ldmpqfname = getenv("LDMPQFNAME"); if(ldmpqfname != NULL) pqfname = ldmpqfname; } { extern int optind; extern int opterr; extern char *optarg; int ch; int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE)); int fterr; opterr = 1; while ((ch = getopt(ac, av, "cvxOl:p:f:q:o:i:")) != EOF) switch (ch) { case 'c': md5ctxp = new_MD5_CTX(); if(md5ctxp == NULL) serror("new_md5_CTX failed"); break; case 'v': logmask |= LOG_MASK(LOG_INFO); break; case 'x': logmask |= LOG_MASK(LOG_DEBUG); break; case 'l': logfname = optarg; break; case 'p': spec.pattern = optarg; /* compiled below */ break; case 'f': fterr = strfeedtypet(optarg, &spec.feedtype) ; if(fterr != FEEDTYPE_OK) { fprintf(stderr, "Bad feedtype \"%s\", %s\n", optarg, strfeederr(fterr)) ; usage(progname); } break; case 'q': pqfname = optarg; break; case 'o': (void) set_timestamp(&clss.from); clss.from.tv_sec -= atoi(optarg); break; case 'i': interval = atoi(optarg); if(interval == 0 && *optarg != '0') { fprintf(stderr, "%s: invalid interval %s", progname, optarg); usage(progname); } break; case 'O': showProdOrigin = 1; break; case '?': usage(progname); break; } (void) setulogmask(logmask); status = regcomp(&spec.rgx, spec.pattern, REG_EXTENDED|REG_NOSUB); if(status != 0) { fprintf(stderr, "Bad regular expression \"%s\"\n", spec.pattern); usage(av[0]); } /* last arg, outputfname, is optional */ if(ac - optind > 0) { const char *const outputfname = av[optind]; if(freopen(outputfname, "a+b", stdout) == NULL) { status = errno; fprintf(stderr, "%s: Couldn't open \"%s\": %s\n", progname, outputfname, strerror(status)); } } } /* * Set up error logging. */ (void) openulog(progname, logoptions, LOG_LDM, logfname); unotice("Starting Up (%d)", getpgrp()); /* * register exit handler */ if(atexit(cleanup) != 0) { serror("atexit"); exit(1); } /* * set up signal handlers */ set_sigactions(); /* * Open the product que */ status = pq_open(pqfname, PQ_READONLY, &pq); if(status) { uerror("pq_open failed: %s: %s\n", pqfname, strerror(status)); exit(1); } while(!done) { if(stats_req) { dump_stats(); stats_req = 0; } status = pq_sequence(pq, TV_GT, &clss, writeprod, 0); switch(status) { case 0: /* no error */ continue; /* N.B., other cases sleep */ case PQUEUE_END: udebug("End of Queue"); break; case EAGAIN: case EACCES: udebug("Hit a lock"); break; default: uerror("pq_sequence failed: %s (errno = %d)", strerror(status), status); exit(1); break; } if(interval == 0) break; pq_suspend(interval); } exit(0); /*NOTREACHED*/ }