[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: 20000303: ldm/pq question (fwd)
- Subject: Re: 20000303: ldm/pq question (fwd)
- Date: Thu, 09 Mar 2000 14:01:50 -0700
> >To: address@hidden
> >From: Craig Searcy <address@hidden>
> >Subject: ldm/pq question
> >Organization: .
> >Keywords: 200003031852.LAA04400
>
> Hi,
> I have just started using the LDM on our LDAD at the Anchorage forecast
> office. I've looked through manuals and man pages and can't see any
> obvious answer to my question.
>
> I would like to know if it is possible to scan a queue for a product and
> determine the originating host that the product came from. We have a
> network with several sites running ldm with the same or similar product
> feeds and I would like to distinguish data from each host.
>
> Any help or pointers with this would be appreciated. Thanks in advance,
>
> Craig Searcy
> Computer Specialist
> NWS Anchorage Forecast Office
Hi Craig,
In response to your question (and for our own purposes) we've added an option to
the pqcat program to display a product's origin. pqcat now takes a -O option,
valid only with the -v option, that will display a product's originating host.
I have attached a new pqcat.c to this message for you. If you have a source
distribution, just replace the pqcat.c file in the pqcat directory with this new
file, then execute 'make pqcat'. If you don't have a source distribution,
you'll
need to wait a bit - we'll be making a new source distribution relatively soon,
and a new binary distribution in a few weeks. Or, of course, you can grab an
old source distribution if you want the change ASAP.
Anne
--
***************************************************
Anne Wilson UCAR Unidata Program
address@hidden P.O. Box 3000
Boulder, CO 80307
----------------------------------------------------
Unidata WWW server http://www.unidata.ucar.edu/
****************************************************
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: pqcat.c,v 1.34 2000/03/09 20:17:24 anne Exp $ */
/*
* dump a pq
*/
#include <ldmconfig.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <rpc/rpc.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#include <regex.h>
#include "ldm.h"
#include "atofeedt.h"
#include "ldmprint.h"
#include "ulog.h"
#include "pq.h"
#include "paths.h"
#include "md5.h"
#ifdef NO_ATEXIT
#include "atexit.h"
#endif
/* default "one trip" */
#ifndef DEFAULT_INTERVAL
#define DEFAULT_INTERVAL 0
#endif
#ifndef DEFAULT_FEEDTYPE
#define DEFAULT_FEEDTYPE ANY
#endif
static volatile int done = 0;
static volatile int intr = 0;
static volatile int stats_req = 0;
static int showProdOrigin = 0;
static const char *pqfname = DEFAULT_QUEUE;
static pqueue *pq = NULL;
static int nprods = 0;
static MD5_CTX *md5ctxp = NULL;
static void
dump_stats(void)
{
unotice("Number of products %d", nprods);
}
/*
*/
/*ARGSUSED*/
static int
writeprod(const prod_info *infop, const void *datap,
void *xprod, size_t size, void *notused)
{
if(ulogIsVerbose())
{
if (showProdOrigin)
uinfo("%s %s", s_prod_info(NULL, 0, infop,
ulogIsDebug()), infop->origin);
else
uinfo("%s", s_prod_info(NULL, 0, infop, ulogIsDebug()));
}
if(md5ctxp != NULL) /* -c option */
{
signaturet check;
MD5Init(md5ctxp);
MD5Update(md5ctxp, datap, infop->sz);
MD5Final(check, md5ctxp);
if(memcmp(infop->signature, check, sizeof(signaturet)) != 0)
{
char sb[33]; char cb[33];
uerror("signature mismatch: %s != %s",
s_signaturet(sb, sizeof(sb), infop->signature),
s_signaturet(cb, sizeof(cb), check));
}
}
if( write(STDOUT_FILENO, datap, infop->sz) !=
infop->sz)
{
int errnum = errno;
serror( "data write failed") ;
return errnum;
}
nprods++;
return 0;
}
static void
usage(const char *av0) /* id string */
{
(void)fprintf(stderr,
"Usage: %s [options] [outputfile]\n\tOptions:\n", av0);
(void)fprintf(stderr,
"\t-v Verbose, tell me about each product\n");
(void)fprintf(stderr,
"\t-l logfile Log to a file rather than stderr\n");
(void)fprintf(stderr,
"\t-f feedtype Scan for data of type \"feedtype\" (default
\"%s\")\n", s_feedtypet(DEFAULT_FEEDTYPE));
(void)fprintf(stderr,
"\t-p pattern Interested in products matching \"pattern\"
(default \".*\")\n") ;
(void)fprintf(stderr,
"\t-q pqfname (default \"%s\")\n", DEFAULT_QUEUE);
(void)fprintf(stderr,
"\t-o offset Set the \"from\" time \"offset\" secs before
now\n");
(void)fprintf(stderr,
"\t (default \"from\" the beginning of the
epoch)\n");
(void)fprintf(stderr,
"\t-i interval Poll queue after \"interval\" secs (default
%d)\n",
DEFAULT_INTERVAL);
(void)fprintf(stderr,
"\t (\"interval\" of 0 means exit at end of
queue)\n");
(void)fprintf(stderr,
"\t-c Check, verify MD5 signature\n");
(void)fprintf(stderr,
"\t-O Include product origin in verbose output\n");
(void)fprintf(stderr,
"\t (valid only with -v option)\n");
(void)fprintf(stderr,
"Output defaults to standard output\n");
exit(1);
}
static void
cleanup(void)
{
unotice("Exiting");
if(!intr)
{
if(md5ctxp != NULL)
free_MD5_CTX(md5ctxp);
if(pq != NULL)
(void)pq_close(pq);
}
dump_stats();
(void) closeulog();
}
static void
signal_handler(int sig)
{
#ifdef SVR3SIGNALS
/*
* Some systems reset handler to SIG_DFL upon entry to handler.
* In that case, we reregister our handler.
*/
(void) signal(sig, signal_handler);
#endif
switch(sig) {
case SIGINT :
unotice("Interrupt");
intr = !0;
exit(0);
case SIGTERM :
udebug("SIGTERM");
done = !0;
return;
case SIGUSR1 :
udebug("SIGUSR1");
stats_req = !0;
return;
case SIGUSR2 :
udebug("SIGUSR2");
rollulogpri();
return;
}
udebug("signal_handler: unhandled signal: %d", sig);
}
/*
* register the signal_handler
*/
static void
set_sigactions(void)
{
struct sigaction sigact;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
/* Ignore these */
sigact.sa_handler = SIG_IGN;
(void) sigaction(SIGHUP, &sigact, NULL);
(void) sigaction(SIGPIPE, &sigact, NULL);
(void) sigaction(SIGALRM, &sigact, NULL);
(void) sigaction(SIGCHLD, &sigact, NULL);
/* Handle these */
#ifdef SA_RESTART /* SVR4, 4.3+ BSD */
/* usually, restart system calls */
sigact.sa_flags |= SA_RESTART;
#endif
sigact.sa_handler = signal_handler;
(void) sigaction(SIGTERM, &sigact, NULL);
(void) sigaction(SIGUSR1, &sigact, NULL);
(void) sigaction(SIGUSR2, &sigact, NULL);
/* Don't restart after interrupt */
sigact.sa_flags = 0;
#ifdef SA_INTERRUPT /* SunOS 4.x */
sigact.sa_flags |= SA_INTERRUPT;
#endif
(void) sigaction(SIGINT, &sigact, NULL);
}
main(int ac, char *av[])
{
const char *progname = ubasename(av[0]);
char *logfname;
prod_class clss;
prod_spec spec;
int status = 0;
int interval = DEFAULT_INTERVAL;
int logoptions = (LOG_CONS|LOG_PID) ;
logfname = "";
if(isatty(fileno(stderr)))
{
/* set interactive defaults */
logfname = "-" ;
logoptions = 0 ;
}
clss.from = TS_ZERO; /* default dump the whole file */
clss.to = TS_ENDT;
clss.psa.psa_len = 1;
clss.psa.psa_val = &spec;
spec.feedtype = ANY;
spec.pattern = ".*";
/*
* Check the environment for some options.
* May be overridden by command line switches below.
*/
{
const char *ldmpqfname = getenv("LDMPQFNAME");
if(ldmpqfname != NULL)
pqfname = ldmpqfname;
}
{
extern int optind;
extern int opterr;
extern char *optarg;
int ch;
int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE));
int fterr;
opterr = 1;
while ((ch = getopt(ac, av, "cvxOl:p:f:q:o:i:")) != EOF)
switch (ch) {
case 'c':
md5ctxp = new_MD5_CTX();
if(md5ctxp == NULL)
serror("new_md5_CTX failed");
break;
case 'v':
logmask |= LOG_MASK(LOG_INFO);
break;
case 'x':
logmask |= LOG_MASK(LOG_DEBUG);
break;
case 'l':
logfname = optarg;
break;
case 'p':
spec.pattern = optarg;
/* compiled below */
break;
case 'f':
fterr = strfeedtypet(optarg, &spec.feedtype) ;
if(fterr != FEEDTYPE_OK)
{
fprintf(stderr, "Bad feedtype \"%s\", %s\n",
optarg, strfeederr(fterr)) ;
usage(progname);
}
break;
case 'q':
pqfname = optarg;
break;
case 'o':
(void) set_timestamp(&clss.from);
clss.from.tv_sec -= atoi(optarg);
break;
case 'i':
interval = atoi(optarg);
if(interval == 0 && *optarg != '0')
{
fprintf(stderr, "%s: invalid interval %s",
progname, optarg);
usage(progname);
}
break;
case 'O':
showProdOrigin = 1;
break;
case '?':
usage(progname);
break;
}
(void) setulogmask(logmask);
status = regcomp(&spec.rgx,
spec.pattern,
REG_EXTENDED|REG_NOSUB);
if(status != 0)
{
fprintf(stderr, "Bad regular expression \"%s\"\n",
spec.pattern);
usage(av[0]);
}
/* last arg, outputfname, is optional */
if(ac - optind > 0)
{
const char *const outputfname = av[optind];
if(freopen(outputfname, "a+b", stdout) == NULL)
{
status = errno;
fprintf(stderr, "%s: Couldn't open \"%s\": %s\n",
progname, outputfname, strerror(status));
}
}
}
/*
* Set up error logging.
*/
(void) openulog(progname,
logoptions, LOG_LDM, logfname);
unotice("Starting Up (%d)", getpgrp());
/*
* register exit handler
*/
if(atexit(cleanup) != 0)
{
serror("atexit");
exit(1);
}
/*
* set up signal handlers
*/
set_sigactions();
/*
* Open the product que
*/
status = pq_open(pqfname, PQ_READONLY, &pq);
if(status)
{
uerror("pq_open failed: %s: %s\n",
pqfname, strerror(status));
exit(1);
}
while(!done)
{
if(stats_req)
{
dump_stats();
stats_req = 0;
}
status = pq_sequence(pq, TV_GT, &clss, writeprod, 0);
switch(status) {
case 0: /* no error */
continue; /* N.B., other cases sleep */
case PQUEUE_END:
udebug("End of Queue");
break;
case EAGAIN:
case EACCES:
udebug("Hit a lock");
break;
default:
uerror("pq_sequence failed: %s (errno = %d)",
strerror(status), status);
exit(1);
break;
}
if(interval == 0)
break;
pq_suspend(interval);
}
exit(0);
/*NOTREACHED*/
}