[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: 20000522: ldmd.conf question regarding pqsurf
- Subject: Re: 20000522: ldmd.conf question regarding pqsurf
- Date: Mon, 22 May 2000 15:32:32 -0600 (MDT)
Gilbert,
At first the FXUS4* metars were TEST metars, now the designator TEST has
been eliminated. I made some versions of the code for some UCAR folks
that wanted the reports. I'll send you the source code for
wmo_header.(c|h), surf_split.c and pqsurf.c. Enter these files into the
pqsurf dir and compile it. At this point, I'll now consider putting the
changes into the main body of the LDM code. One needs to remove the "pil"
suffix, also save the original files before overwritting them.
Robb...
On Mon, 22 May 2000, Unidata Support wrote:
>
> ------- Forwarded Message
>
> >To: General Support <address@hidden>
> >From: Gilbert Sebenste <address@hidden>
> >Subject: ldmd.conf question regarding pqsurf
> >Organization: UCAR/Unidata
> >Keywords: 200005201749.e4KHn0T17954
>
> Hello,
>
> I have a quick question regarding pqsurf. In addition to getting the
> SA|SPUS products, I would also like to add FXUS4. K... to be captured by
> pqsurf. I looked over the LDM instructions, and can't figure out how to do
> it without messing up my original entry.
>
> Any help greatly appreciated! BTW, they are sending metars under this
> header as well now. Fun!
>
> *******************************************************************************
> Gilbert Sebenste ********
> Internet: address@hidden (My opinions only!) ******
> Staff Meteorologist, Northern Illinois University ****
> E-mail: address@hidden ***
> web: http://weather.admin.niu.edu **
> Work phone: 815-753-5492 *
> *******************************************************************************
>
>
> ------- End of Forwarded Message
>
===============================================================================
Robb Kambic Unidata Program Center
Software Engineer III Univ. Corp for Atmospheric Research
address@hidden WWW: http://www.unidata.ucar.edu/
===============================================================================
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: pqsurf.c,v 1.51 1999/04/02 23:16:35 davis Exp $ */
/*
*
*/
/*
* Need to create a queue before running this:
* pqcreate -c -s 2M -S 13762 /usr/local/ldm/data/pqsurf.pq
*/
#include <ldmconfig.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rpc/rpc.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <sys/wait.h>
#include <assert.h>
#include <regex.h>
#include "ldm.h"
#include "atofeedt.h"
#include "ldmprint.h"
#include "ulog.h"
#include "pq.h"
#include "paths.h"
#include "surface.h"
#ifdef NO_ATEXIT
#include "atexit.h"
#endif
extern int usePil; /* 1/0 flag to signal use of AFOS like pil identifier */
#ifndef DEFAULT_INTERVAL
#define DEFAULT_INTERVAL 15
#endif
static volatile int done = 0;
static volatile int intr = 0;
static volatile int stats_req = 0;
#ifndef DEFAULT_PATTERN
#define DEFAULT_PATTERN "^S[AIMNP]"
#endif
#ifndef DEFAULT_FEEDTYPE
#define DEFAULT_FEEDTYPE (IDS|DDS)
#endif
static const char *pqfname = DEFAULT_QUEUE;
static pqueue *pq = NULL;
/* set in paths.h by configure */
#ifndef DEFAULT_SURF_OUTQUEUE
#define DEFAULT_SURF_OUTQUEUE "/usr/local/ldm/data/pqsurf.pq"
#endif
static char *opqfname = DEFAULT_SURF_OUTQUEUE;
static pqueue *opq = NULL;
/* set in paths.h by configure */
#ifndef DEFAULT_SURF_CONFFILE
#define DEFAULT_SURF_CONFFILE "/usr/local/ldm/etc/pqsurf.conf"
#endif
/* set in paths.h by configure */
#ifndef DEFAULT_SURF_DATADIR
#ifndef DEFAULT_DATADIR
#define DEFAULT_SURF_DATADIR "/usr/local/ldm"
#else
#define DEFAULT_SURF_DATADIR DEFAULT_DATADIR
#endif
#endif
#ifndef DEFAULT_PIPE_TIMEO
#define DEFAULT_PIPE_TIMEO 60
#endif
#ifndef DEFAULT_AGE
#define DEFAULT_AGE (1. + (double)(DEFAULT_INTERVAL)/3600.)
#endif
static pid_t act_pid;
/*
* Set to non-root privilege if possible.
* Do it in such a way that it is safe to fork.
* TODO: this is duplicated from ../server/priv.c
*/
static void
endpriv(void)
{
const uid_t euid = geteuid();
const uid_t uid = getuid();
/* if either euid or uid is unprivileged, use it */
if(euid > 0)
setuid(euid);
else if(uid > 0)
setuid(uid);
/* else warn??? or set to nobody??? */
}
static pid_t
run_child(int argc, char *argv[])
{
pid_t pid;
if(ulogIsDebug())
{
char command[1024];
char *cp = command;
int ii = 0;
command[0] = 0;
while (ii < argc)
{
strcpy(cp, argv[ii]);
cp += strlen(argv[ii]);
if(++ii == argc)
break;
*cp++ = ' ';
*cp = 0;
}
udebug("exec'ing: \"%s\"", command);
}
pid = fork();
if(pid == -1)
{
serror("run_child: fork failed");
return pid;
}
if(pid == 0)
{ /* child */
(void)signal(SIGCHLD, SIG_DFL);
(void)signal(SIGTERM, SIG_DFL);
/* keep same descriptors as parent */
/* don't let child get real privilege */
endpriv();
(void) execvp(argv[0], &argv[0]);
serror("run_child: execvp: %s", argv[0]);
_exit(127);
}
/* else, parent */
return pid;
}
static int nprods = 0;
static int nsplit = 0;
static int ndups = 0;
static void
dump_stats(void)
{
unotice("Number of products %d", nprods);
unotice("Number of observations %d", nsplit);
unotice("Number of dups %d", ndups);
}
/* defined in surf_split.c */
extern int surf_split(const prod_info *infop, const void *datap,
int (*doit)(const prod_info *, const void *));
static int
doOne(const prod_info *infop, const void *datap)
{
struct product prod;
int status = ENOERR;
if(ulogIsDebug())
udebug("%s", s_prod_info(NULL, 0, infop, 1));
prod.info = *infop;
prod.data = (void *)datap; /* cast away const */
nsplit++; /* ?? Do it here on only on success ?? */
status = pq_insertNoSig(opq, &prod);
if(status == ENOERR)
{
return status; /* Normal return */
}
/* else */
if(status == PQUEUE_DUP)
{
ndups++;
if(ulogIsVerbose())
uinfo("Product already in queue: %s",
s_prod_info(NULL, 0, &prod.info,
ulogIsDebug()));
return status;
}
/* else, error */
uerror("pq_insert: %s\n", strerror(status));
return status;
}
/*
*/
static int
split_prod(const prod_info *infop, const void *datap,
void *xprod, size_t size, void *vp)
{
size_t *nsp = (size_t *)vp;
int ns;
if(ulogIsVerbose())
uinfo("%s", s_prod_info(NULL, 0, infop, ulogIsDebug()));
ns = surf_split(infop, datap, doOne);
nprods++;
(void)kill(SIGCONT, act_pid);
if(nsp != NULL && ns >= 0)
*nsp = (size_t)ns;
return 0;
}
static void
usage(const char *av0) /* id string */
{
(void)fprintf(stderr,
"Usage: %s [options] [confilename]\t\nOptions:\n",
av0);
(void)fprintf(stderr,
"\t-v Verbose, log each match (SIGUSR2 toggles)\n");
(void)fprintf(stderr,
"\t-x Debug mode\n");
(void)fprintf(stderr,
"\t-l logfile Send log info to file (default uses
syslogd)\n");
(void)fprintf(stderr,
"\t-d datadir cd to \"datadir\" before interpreting filenames
in\n");
(void)fprintf(stderr,
"\t conffile (default %s)\n",
DEFAULT_SURF_DATADIR);
(void)fprintf(stderr,
"\t-q queue default \"%s\"\n", DEFAULT_QUEUE);
(void)fprintf(stderr,
"\t-p pattern Interested in products matching \"pattern\"
(default \"%s\")\n", DEFAULT_PATTERN);
(void)fprintf(stderr,
"\t-f feedtype Interested in products from feed \"feedtype\"
(default %s)\n", s_feedtypet(DEFAULT_FEEDTYPE));
(void)fprintf(stderr,
"\t-i interval loop, polling each \"interval\" seconds
(default %d)\n", DEFAULT_INTERVAL);
(void)fprintf(stderr,
"\t-a age Expire products older than \"age\" hours
(default %.4f)\n", DEFAULT_AGE);
(void)fprintf(stderr,
"\t-t timeo set write timeo for PIPE subprocs to \"timeo\"
secs (default %d)\n", DEFAULT_PIPE_TIMEO);
(void)fprintf(stderr,
"\t-o offset the oldest product we will consider is
\"offset\" secs before now (default: most recent in output queue)\n");
(void)fprintf(stderr,
"\t-Q outQueue default \"%s\"\n", DEFAULT_SURF_OUTQUEUE);
(void)fprintf(stderr,
"\t(default conffilename is %s)\n",
DEFAULT_SURF_CONFFILE);
exit(1);
}
static pid_t
reap_act(int options)
{
pid_t wpid = 0;
int status = 0;
#ifndef NO_WAITPID
wpid = waitpid(act_pid, &status, options);
#else
if(options == 0)
wpid = wait(&status);
/* customize here for older systems, use wait3 or whatever */
#endif
if(wpid == -1)
{
if(!(errno == ECHILD && act_pid == -1))
{
/* Only complain if relevant */
serror("waitpid");
}
return -1;
}
/* else */
if(wpid != 0)
{
/* tag_pid_entry(wpid); */
#ifndef NO_WAITPID
if(WIFSTOPPED(status))
{
unotice("child %d stopped by signal %d",
wpid, WSTOPSIG(status));
}
else if(WIFSIGNALED(status))
{
unotice("child %d terminated by signal %d",
wpid, WTERMSIG(status));
/* DEBUG */
switch(WTERMSIG(status)) {
/*
* If a child dumped core,
* shut everything down.
*/
case SIGQUIT:
case SIGILL:
case SIGTRAP: /* ??? */
case SIGABRT:
#if defined(SIGEMT)
case SIGEMT: /* ??? */
#endif
case SIGFPE: /* ??? */
case SIGBUS:
case SIGSEGV:
#if defined(SIGSYS)
case SIGSYS: /* ??? */
#endif
#ifdef SIGXCPU
case SIGXCPU:
#endif
#ifdef SIGXFSZ
case SIGXFSZ:
#endif
act_pid = -1;
exit(1);
break;
}
}
else if(WIFEXITED(status))
{
if(WEXITSTATUS(status) != 0)
unotice("child %d exited with status %d",
wpid, WEXITSTATUS(status));
else
udebug("child %d exited with status %d",
wpid, WEXITSTATUS(status));
act_pid = -1;
exit(WEXITSTATUS(status));
}
#endif
}
return wpid;
}
void
cleanup(void)
{
unotice("Exiting");
if(act_pid != -1)
{
(void)signal(SIGCHLD, SIG_IGN);
kill(act_pid, SIGTERM);
(void) reap_act(0);
}
if(opq != NULL)
{
off_t highwater = 0;
size_t maxregions = 0;
(void) pq_highwater(opq, &highwater, &maxregions);
(void) pq_close(opq);
opq = NULL;
unotice(" Queue usage (bytes):%8ld",
(long)highwater);
unotice(" (nregions):%8ld",
(long)maxregions);
}
if(pq != NULL)
{
(void) pq_close(pq);
pq = NULL;
}
dump_stats();
(void) closeulog();
}
static void
signal_handler(int sig)
{
#ifdef SVR3SIGNALS
/*
* Some systems reset handler to SIG_DFL upon entry to handler.
* In that case, we reregister our handler.
*/
(void) signal(sig, signal_handler);
#endif
switch(sig) {
case SIGINT :
unotice("Interrupt");
intr = !0;
exit(0);
case SIGTERM :
udebug("SIGTERM");
(void) sleep(0);
done = !0;
return;
case SIGUSR1 :
udebug("SIGUSR1");
stats_req = !0;
return;
case SIGUSR2 :
udebug("SIGUSR2");
rollulogpri();
return;
case SIGCHLD :
(void) reap_act(WNOHANG);
/* usually calls exit */
return;
}
udebug("signal_handler: unhandled signal: %d", sig);
}
/*
* register the signal_handler
*/
static void
set_sigactions(void)
{
struct sigaction sigact;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
/* Ignore these */
sigact.sa_handler = SIG_IGN;
(void) sigaction(SIGHUP, &sigact, NULL);
(void) sigaction(SIGPIPE, &sigact, NULL);
(void) sigaction(SIGALRM, &sigact, NULL);
/* Handle these */
#ifdef SA_RESTART /* SVR4, 4.3+ BSD */
/* usually, restart system calls */
sigact.sa_flags |= SA_RESTART;
#endif
sigact.sa_handler = signal_handler;
(void) sigaction(SIGTERM, &sigact, NULL);
(void) sigaction(SIGUSR1, &sigact, NULL);
(void) sigaction(SIGUSR2, &sigact, NULL);
(void) sigaction(SIGCHLD, &sigact, NULL);
/* Don't restart after interrupt */
sigact.sa_flags = 0;
#ifdef SA_INTERRUPT /* SunOS 4.x */
sigact.sa_flags |= SA_INTERRUPT;
#endif
(void) sigaction(SIGINT, &sigact, NULL);
}
static int
expire(pqueue *epq, const unsigned interval, const double age)
{
int status = ENOERR;
static timestampt now;
static prod_class eclss;
static prod_spec spec;
timestampt ts;
timestampt cursor;
double diff = 0.;
double max_latency = 0.;
size_t nr;
if(eclss.psa.psa_val == 0)
{
/* first time */
eclss.from = TS_ZERO;
eclss.psa.psa_len = 1;
eclss.psa.psa_val = &spec;
spec.feedtype = ANY;
spec.pattern = ".*";
regcomp(&spec.rgx, spec.pattern, REG_EXTENDED|REG_NOSUB);
}
(void) set_timestamp(&now);
if(d_diff_timestamp(&now, &eclss.to) < interval + age)
{
/* only run this routine every interval seconds */
udebug("not yet");
return ENOERR;
}
/* else */
eclss.to = now;
eclss.to.tv_sec -= age;
if(ulogIsDebug())
{
char cp[64];
sprint_timestampt(cp, sizeof(cp), &eclss.to);
udebug("to %s", cp);
}
pq_cset(epq, &TS_ZERO);
while(!done && !stats_req)
{
nr = 0;
status = pq_seqdel(epq, TV_GT, &eclss, 0, &nr, &ts);
switch(status) {
case ENOERR:
pq_ctimestamp(epq, &cursor);
diff = d_diff_timestamp(&cursor, &ts);
if(diff > max_latency)
{
max_latency = diff;
udebug("max_latency %.3f", max_latency);
}
if(nr == 0)
{
diff = d_diff_timestamp(&cursor, &eclss.to);
udebug("diff %.3f", diff);
if(diff > interval + max_latency)
{
udebug("heuristic depth break");
break;
}
}
continue; /* N.B., other cases break and return */
case PQUEUE_END:
udebug("expire: End of Queue");
break;
case EAGAIN:
case EACCES:
udebug("Hit a lock");
break;
#if defined(EDEADLOCK) && EDEADLOCK != EDEADLK
case EDEADLOCK:
#endif
case EDEADLK:
uerror("%s", strerror(status));
break;
default:
uerror("pq_seqdel failed: %s (errno = %d)",
strerror(status), status);
break;
}
break;
}
return status;
}
main(int ac, char *av[])
{
const char *progname = ubasename(av[0]);
char *logfname;
prod_class clss;
prod_spec spec;
int status = 0;
unsigned interval = DEFAULT_INTERVAL;
int logoptions = (LOG_CONS|LOG_PID);
double age = DEFAULT_AGE;
/* these are containers for the pqact args */
char *argv[16];
int argc = 0;
int toffset = TOFFSET_NONE;
logfname = "";
if(set_timestamp(&clss.from) != ENOERR) /* corrected by toffset below */
{
int errnum = errno;
fprintf(stderr, "Couldn't set timestamp: %s",
strerror(errnum));
exit(1);
}
clss.to = TS_ENDT;
clss.psa.psa_len = 1;
clss.psa.psa_val = &spec;
spec.feedtype = DEFAULT_FEEDTYPE;
spec.pattern = DEFAULT_PATTERN;
memset(argv, 0, sizeof(argv));
argv[0] = "pqact";
argc++;
/*
* Check the environment for some options.
* May be overridden by command line switches below.
*/
{
const char *ldmpqfname = getenv("LDMPQFNAME");
if(ldmpqfname != NULL)
pqfname = ldmpqfname;
}
{
extern int optind;
extern int opterr;
extern char *optarg;
int ch;
int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE));
int fterr;
char *conffilename = DEFAULT_SURF_CONFFILE;
char *datadir = DEFAULT_SURF_DATADIR;
usePil = 1;
opterr = 1;
while ((ch = getopt(ac, av, "vxl:d:f:p:q:Q:o:i:a:t:")) != EOF)
switch (ch) {
case 'v':
argv[argc++] = "-v";
logmask |= LOG_MASK(LOG_INFO);
break;
case 'x':
argv[argc++] = "-x";
logmask |= LOG_MASK(LOG_DEBUG);
break;
case 'l':
argv[argc++] = "-l";
argv[argc++] = optarg;
logfname = optarg;
break;
case 'd':
datadir = optarg;
break;
case 'f':
fterr = strfeedtypet(optarg, &spec.feedtype);
if(fterr != FEEDTYPE_OK)
{
fprintf(stderr, "%s: %s: \"%s\"\n",
av[0], strfeederr(fterr), optarg);
usage(progname);
}
argv[argc++] = "-f";
argv[argc++] = optarg;
break;
case 'p':
spec.pattern = optarg;
/* compiled below */
break;
case 'q':
pqfname = optarg;
break;
case 'Q':
opqfname = optarg;
break;
case 'o':
toffset = atoi(optarg);
if(toffset == 0 && *optarg != '0')
{
fprintf(stderr, "%s: invalid offset %s\n",
av[0], optarg);
usage(av[0]);
}
argv[argc++] = "-o";
argv[argc++] = optarg;
break;
case 'i':
interval = atoi(optarg);
if(interval == 0 && *optarg != '0')
{
fprintf(stderr, "%s: invalid interval \"%s\"\n",
av[0], optarg);
usage(av[0]);
}
/* N.B. -i just used for input queue. */
break;
case 'a':
age = atof(optarg);
if(age < 0.)
{
(void) fprintf(stderr,
"age (%s) must be non negative\n",
optarg);
usage(av[0]);
}
break;
case 't':
/* pipe_timeo */
argv[argc++] = "-t";
argv[argc++] = optarg;
break;
case '?':
usage(progname);
break;
}
(void) setulogmask(logmask);
status = regcomp(&spec.rgx,
spec.pattern,
REG_EXTENDED|REG_NOSUB);
if(status != 0)
{
fprintf(stderr, "Bad regular expression \"%s\"\n",
spec.pattern);
usage(av[0]);
}
if(ac - optind == 1)
conffilename = av[optind];
argv[argc++] = "-d";
argv[argc++] = datadir;
argv[argc++] = "-q";
argv[argc++] = opqfname;
argv[argc++] = conffilename;
age *= 3600.;
}
if(toffset != TOFFSET_NONE)
{
clss.from.tv_sec -= toffset;
}
else
{
clss.from.tv_sec -= (age - interval);
}
/*
* Set up error logging.
* N.B. log ident is the remote
*/
(void) openulog(progname,
logoptions, LOG_LDM, logfname);
unotice("Starting Up (%d)", getpgrp());
/*
* register exit handler
*/
if(atexit(cleanup) != 0)
{
serror("atexit");
exit(1);
}
/*
* set up signal handlers
*/
set_sigactions();
/*
* Open the Output product que
*/
status = pq_open(opqfname, PQ_DEFAULT, &opq);
if(status)
{
uerror("pq_open failed: %s: %s\n",
opqfname, strerror(status));
exit(1);
}
act_pid = run_child(argc, argv);
if(act_pid == (pid_t)-1)
exit(1);
/*
* Open the product que
*/
status = pq_open(pqfname, PQ_READONLY, &pq);
if(status)
{
uerror("pq_open failed: %s: %s\n",
pqfname, strerror(status));
exit(1);
}
if(toffset == TOFFSET_NONE)
{
/* Jump to the end of the queue */
timestampt sav;
sav = clss.from;
clss.from = TS_ZERO;
(void) pq_last(pq, &clss, NULL);
clss.from = sav;
}
else
{
pq_cset(pq, &clss.from);
}
if(ulogIsVerbose())
{
char buf[1984];
uinfo("%s",
s_prod_class(buf, sizeof(buf), &clss));
}
while(!done)
{
if(stats_req)
{
dump_stats();
stats_req = 0;
}
status = pq_sequence(pq, TV_GT, &clss, split_prod, NULL);
switch(status) {
case 0: /* no error */
continue; /* N.B., other cases sleep */
case PQUEUE_END:
udebug("surf: End of Queue");
break;
case EAGAIN:
case EACCES:
udebug("Hit a lock");
break;
default:
uerror("pq_sequence failed: %s (errno = %d)",
strerror(status), status);
exit(1);
break;
}
if(interval == 0)
{
break;
}
(void) expire(opq, interval, age);
pq_suspend(interval);
}
/*
* TODO: how can we determine that pqact has finished
* the work in opq?
*/
sleep(5);
exit(0);
}
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: surf_split.c,v 1.30 1999/12/02 23:21:26 rkambic Exp $ */
#include <ldmconfig.h>
#include <stdio.h>
#include <ctype.h>
#include <string.h>
#include "ldm.h"
#include "ulog.h"
#include "wmo_header.h"
#include "tokens.h"
#include "xbuf.h"
#include "surface.h" /* wind_units_t, CALL_SIGN_LEN */
#include "md5.h"
static double md5ctx[16]; /* 88 would be big enough */
static MD5_CTX *md5ctxp = (MD5_CTX *)md5ctx;
static int
get_yygg(xbuf *buf, dtime *time)
{
int status;
int YY = -1;
int GG = -1;
if((status = dget_wnum(buf, &YY, 2)) < 0) return status;
if((status = dget_num(buf, &GG, 2)) < 0) return status;
set_dtime(time, YY, GG, 0);
return status;
}
/* For METAR, check if the HHMMZ time string is present */
static int
whas_yyggZ(xbuf *buf)
{
int ch;
/* skip white space */
do{
ch = nextc(buf);
}while((isascii(ch) && !isgraph(ch)));
unnextc(buf,ch);
if(buf->cnt < 5)
return 0; /* not enough characters */
if(buf->get[4] != 'Z'
|| !isdigit(buf->get[3])
|| !isdigit(buf->get[2])
|| !isdigit(buf->get[1])
|| !isdigit(buf->get[0]))
return 0;
return 1; /* passed */
}
/* For METAR, check if "NIL" */
static int
has_NIL(xbuf *buf)
{
char nilstr[] = "NIL";
char *np = (char *)&buf->base[buf->bufsiz - 1 - (sizeof(nilstr) -1 -1)];
if(strncmp(np, nilstr, sizeof(nilstr) -1) == 0)
return 1;
return 0;
}
/* For METAR, get the bulletin time, if possible */
static void
get_wyyggZ(xbuf *buf, dtime *time)
{
int ch;
if(!whas_yyggZ(buf))
return;
(void)get_yygg(buf, time);
ch = nextc(buf); /* eat the 'Z' */
return;
}
/*
* Takes a WMO format product which is a
* SAO, SYNOP, SHIP, METAR, or SPECI message, splits it into
* individual observations. The observations are each encapsulated in a
* new product which inherits most of its description from the
* original product.
* The new product pkey is derived from the observation type
* and has the following form:
*
* SAO - "sao tt ccc ddhhmm"
* where:
* tt is SA, SP or RS
* ccc is the station ID like SFO, LXV, etc
* ddhhmm is the time stamp.
*
* SYNOP - "aaxx nnnnn ddhhmm"
* where:
* nnnnn is the WMO station id (5 digit number)
*
* SHIP - "bbxx c* ddhhmm"
* where:
* c* is the call sign
*
* METAR - "metar cccc ddhhmm"
* where:
* cccc is the call sign
*
* SPECI - "speci cccc ddhhmm"
*
* The new product sequence number is original sequence number times 1000
* plus the sequence of the individual observation within the product.
*
* 'doit' is called on each of the new products. It is presumed
* this function return zero upon success.
*
* Returns the number of successful calls to 'doit', eg, the
* number of splits. Returns -1 on error.
*/
int
surf_split(const prod_info *infop, const void *datap,
int (*doit)(const prod_info *, const void *))
{
int action = -1;
wmo_header_t hdr;
message_type_t mtype;
dtime dt;
xbuf buf[1];
unsigned char dbuf[8192]; /* TODO */
int nsplit = 0;
enum {
SURFACE_BOGUS ,
AAXX,
US_AAXX,
BBXX,
SAO,
sMETAR,
sSPECI
} subtype = SURFACE_BOGUS;
hdr.time = &dt;
if(infop->sz > sizeof(dbuf))
return -1; /* TODO: too big */
memcpy(dbuf, datap, infop->sz);
if( cbuftoxbuf(buf, (unsigned char *)dbuf,
infop->sz) == NULL)
return -1;
skipline(buf, 4); /* SOH */
skipline(buf, 12); /* start */
if( get_wmo_header(buf, &hdr) == NULL)
{
return -1;
}
#if DEBUG
fputs("\t", stderr);
fprint_wmo_header(stderr, &hdr);
fputs("\n", stderr);
#endif
mtype = decode_type(hdr.TT,hdr.AA,hdr.PIL);
/* #### */
{
char cbuf[8];
int digit;
dtime time;
wind_units_t wind_units = WIND_UNAVAIL;
time = *hdr.time; /* default the ob time to the time in the header */
/* delve into section 0 */
switch(mtype) {
case SYNOP :
if(get_wstr(buf, cbuf, 1) < 0 ) return -1;
if(cbuf[0] == 'A')
{
subtype = AAXX;
if(get_str(buf, &cbuf[1], 3) < 0 ) return -1;
if( cbuf[3] != 'X' )
{
/* punt */
uerror("surface_split: Unknown type: %s\n",
cbuf);
return 0;
}
if(get_yygg(buf, &time) < 0 ) return -1; /* YYGG */
if(dget_num(buf, &digit, 1) < 0 ) return -1; /* isubw */
if(digit >= 0 && digit <= 4) wind_units =
(wind_units_t)digit;
}
else if(isascii(cbuf[0]) && isdigit(cbuf[0])) /* US Stations
7NNNN */
{
unnextc(buf,cbuf[0]);
subtype = US_AAXX;
/*
* Some US reports leave off AAXX YYGGisubw, so we use
the
* time from the wmo header.
*/
wind_units = KNOTS;
}
else
{
unnextc(buf,cbuf[0]);
return 0; /* ?? */
}
break;
case SHIP :
if(get_wstr(buf, cbuf, 4) < 0 ) return -1;
if(cbuf[0] == 'B')
{
if( cbuf[3] != 'X' )
{
/* punt */
uerror("surface_split: Unknown type: %s\n",
cbuf);
return 0;
}
subtype = BBXX;
/* get time below */
}
else
{
unnextc(buf,cbuf[0]);
return 0;
}
break;
case METAR :
if(whasSTR(buf, "METAR"))
{
subtype = sMETAR;
get_wyyggZ(buf, &time);
}
else if(hdr.PIL[0] == 'M' && hdr.PIL[1] == 'T')
{
uerror("HDR + PIL: %s%s %s", hdr.TT, hdr.AA,
hdr.PIL ) ;
/* skip 6 char PIL */
if(get_wstr(buf, cbuf, CALL_SIGN_LEN) < 0)
return 0;
subtype = sMETAR;
get_wyyggZ(buf, &time);
}
else
subtype = SAO; /* may actually be a METAR, check below
*/
break;
case SPECI :
if(whasSTR(buf, "SPECI"))
{
subtype = sSPECI;
get_wyyggZ(buf, &time);
}
break;
default :
uerror("surface_split: Can't handle %s",
sMessage_type(mtype) );
uerror("HDR + PIL: %s%s %s", hdr.TT, hdr.AA, hdr.PIL ) ;
return -1;
}
{ /* while block */
static char newkey[KEYSIZE];
xbuf subbuf[1];
prod_info newinfo = *infop;
#define MAX_SURF_LEN 511
#undef MIN
#define MIN(a,b) ((a) <= (b) ? (a) : (b))
char pbuf[MAX_SURF_LEN + 1];
int l1, l2;
static char ident[CALL_SIGN_LEN+1];
static char type[4];
u_int subseq = infop->seqno * 1000;
unsigned char *pp;
while( get_weqxbuf(buf, subbuf) > 0 )
{
(void)memset(newkey,0,KEYSIZE);
(void)memset(pbuf,0,MAX_SURF_LEN + 1);
(void)memset(ident,0,CALL_SIGN_LEN+1);
pp = subbuf->base;
switch(subtype) {
case AAXX :
case US_AAXX :
strcpy(newkey, "aaxx ");
strcpy(pbuf, "AAXX");
sprintf(&pbuf[strlen(pbuf)], " %02d%02d%1d\r\r\n",
time.mday, time.hour, (int)wind_units);
/* WMO station no. */
if(get_wstr(subbuf, ident, 5) < 0)
continue;
strcat(newkey, ident);
break;
case BBXX :
strcpy(newkey, "bbxx ");
strcpy(pbuf, "BBXX\r\r\n");
/* call sign */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
strcat(newkey, ident);
if(get_yygg(subbuf, &time) < 0) continue; /* YYGG */
break;
case sSPECI :
/* call sign */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
if(strcmp(ident, "SPECI") == 0)
{
/* They package each ob with a tag */
pp = (subbuf->get +1);
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
}
if(!whas_yyggZ(subbuf))
{
/* Have to insert the date */
sprintf(pbuf, "SPECI\r\r\n%s %02d%02dZ ",
ident, time.hour, time.min);
pp = subbuf->get;
}
else
strcpy(pbuf, "SPECI\r\r\n");
strcpy(newkey, "speci ");
strcat(newkey, ident);
break;
case sMETAR :
if(has_NIL(subbuf))
continue;
/* call sign */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
if(strcmp(ident, "METAR") == 0)
{
/* They package each ob with a tag */
pp = (subbuf->get +1);
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
}
if(!whas_yyggZ(subbuf))
{
/* Have to insert the date */
sprintf(pbuf, "METAR\r\r\n%s %02d%02dZ ",
ident, time.hour, time.min);
pp = subbuf->get;
}
else
strcpy(pbuf, "METAR\r\r\n");
strcpy(newkey, "metar ");
strcat(newkey, ident);
break;
case SAO :
/* call sign */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
if(hdr.AA[0] == 'U' && hdr.AA[1] == 'S'
&& strlen(ident) == 6)
{
/* skip 6 char US "AFOS code" */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
}
/* SA, SP, RS, USP or XP */
if(get_wstr(subbuf, type, 3) < 0)
continue;
if((type[0] == 'S'
&& (type[1] == 'A' || type[1] == 'P'))
|| (type[0] == 'R' && type[1] == 'S')
|| (type[0] == 'U' && type[1] == 'S'
&& type[2] == 'P')
|| (type[0] == 'X' && type[1] == 'P')
|| (type[0] == 'T' &&
(type[1] == 'A' || type[1] == 'S'))
)
{
strcpy(newkey, "sao ");
strcat(newkey, type);
strcat(newkey, " ");
strcat(newkey, ident);
}
else if(isdigit(type[0]) && isdigit(type[1]))
{
/* it is a METAR really */
subtype = sMETAR;
strcpy(newkey, "metar ");
strcat(newkey, ident);
strcpy(pbuf, "METAR\r\r\n");
}
else
continue; /* don't know what it is, "NIL=" */
break;
}
/* safety net */
if(strlen(ident) == 0)
{
continue;
}
/* else */
sprintf(&newkey[strlen(newkey)], " %02d%02d%02d",
time.mday, time.hour, time.min);
if(hdr.retransmit != ORIGINAL)
sprintf(&newkey[strlen(newkey)], " %s",
sRetransmit(&hdr));
newinfo.ident = newkey;
newinfo.seqno = ++subseq;
l1 = strlen(pbuf);
l2 = MIN(MAX_SURF_LEN - l1 - 4, subbuf->bufsiz - (pp -
subbuf->base));
/* N.B.: silent truncation */
strncat(pbuf, (char *)pp, l2 );
strcat(pbuf,"=\r\r\n");
newinfo.sz = l1 + l2 + 4;
#if DEBUG
fprintf(stderr,"\t\t%s\n", newinfo.ident);
#endif
#if PRINT
{
char *cp = pbuf;
char *end = &cp[newinfo.sz];
while(cp < end)
{
putc(*cp, stderr);
cp++;
}
}
putc('\n', stderr);
#endif
MD5Init(md5ctxp);
MD5Update(md5ctxp, (const unsigned char *)pbuf, newinfo.sz);
MD5Final(newinfo.signature, md5ctxp);
/*
* process the single ob in the requested fashion
*/
if((*doit)(&newinfo, pbuf) == 0)
nsplit++;
} /* end while */
#if PRINT
putc('\n', stderr);
#endif
} /* end while block */
} /* end #### block */
return nsplit;
}
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: wmo_header.c,v 1.30 1999/06/21 22:15:21 rkambic Exp $ */
#include <stdio.h>
#include <string.h>
#include "wmo_header.h"
#include "alloc.h"
#include "tokens.h"
#include "xbuf.h"
int usePil=0;
void
free_wmo_header(wmo_header_t *hdr)
{
if(hdr == NULL) return;
free_dtime(hdr->time);
free(hdr);
}
static void
clear_wmo_header(wmo_header_t *hdr)
{
hdr->TT[0] = hdr->AA[0] = hdr->ii =
hdr->CCCC[0] = hdr->PIL[0] = 0;
hdr->retransmit = ORIGINAL;
hdr->retrans_seq = MESSAGE_TYPE_UNKNOWN;
clear_dtime(hdr->time);
}
wmo_header_t *
get_wmo_header(xbuf *buf, wmo_header_t *hdr)
{
clear_wmo_header(hdr);
if( get_wstr(buf, hdr->TT, 2) == EOB ) return NULL;
if( get_wstr(buf, hdr->AA, 2) == EOB ) return NULL;
if( get_wnum(buf, &hdr->ii, 2) == EOB ) return NULL;
if(hdr->ii == -1) hdr->ii = 0;
if( get_wstr(buf, hdr->CCCC, 4) == EOB ) return NULL;
{
int YY, GG, gg;
if( get_wnum(buf, &YY, 2) == EOB ) return NULL;
if( get_wnum(buf, &GG, 2) == EOB ) return NULL;
if( get_wnum(buf, &gg, 2) == EOB ) return NULL;
/* uses current time on parse errors */
hdr->time = set_dtime(hdr->time, YY,GG,gg);
}
/* decode BBB feild */
{ /* inline */
char line[16];
char pilstr[16];
int allnum, nonalph, ich;
if( get_wline(buf, line, sizeof(line)) == EOB ) return NULL;
{ /* inline inner */
const char *cp = &line[0];
const char *const end = &line[strlen(line)];
/* N.B. twisted flow here */
for(cp = &line[0]; *cp != 0 && *cp != CR && cp + 2 < end; cp++)
{
switch( cp[0] ) {
case 'R' :
if(cp[1] == 'R')
{
hdr->retransmit = RTD;
hdr->retrans_seq = cp[2];
}
else if(cp[1] == 'T')
{
hdr->retransmit = RTD;
}
break;
case 'C' :
if(cp[1] == 'C')
{
hdr->retransmit = COR;
hdr->retrans_seq = cp[2];
}
else if(cp[1] == 'O')
{
hdr->retransmit = COR;
}
break;
case 'A' :
if(cp[1] == 'A')
{
hdr->retransmit = AMD;
hdr->retrans_seq = cp[2];
}
else if(cp[1] == 'M')
{
hdr->retransmit = AMD;
}
break;
case 'P' :
hdr->retransmit = PIE;
hdr->retrans_seq = (cp[1] - 'A') * 26
+ (cp[2] - 'A');
goto done;
case 0 :
break;
default :
continue; /* loop, look some more */
}
/* only arrrive here if we got an acceptable BBB string */
if(hdr->retrans_seq == 0)
{
int tmp;
/* see if they used old style, eg RTD01 */
cp += 3;
tmp = atoi(cp);
/* Attachment II-14 */
if(tmp >= 0 && tmp < 24 )
hdr->retrans_seq = tmp + 'A';
else
hdr->retrans_seq = 'Y';
}
done:
break;
} /* end for */
} /* end inline inner */
if((usePil == 1) && ( get_wstr(buf, pilstr, sizeof(pilstr)) != EOB ))
{
if((pilstr[0] != NULL) && ( get_wline(buf, line, sizeof(line)) !=
EOB ))
{
if((line[0] == NULL)&&(strlen(pilstr) > 4)&&(strlen(pilstr) < 7))
{
nonalph = 0; allnum = 1;
for(ich = 0;ich < strlen(pilstr);ich++)
{
if(isalnum(pilstr[ich]) == 0) nonalph = 1;
if(isdigit(pilstr[ich]) == 0) allnum = 0;
}
if((nonalph == 0)&&(allnum == 0)) sprintf(hdr->PIL,"
/p%s\0",pilstr);
}
else if((line[0] == NULL)&&(strncmp(pilstr,"^NMC",4) == 0)&&
(strlen(pilstr+4) > 4)&&(strlen(pilstr+4) < 7))
sprintf(hdr->PIL," /p%s\0",pilstr+4);
}
}
} /* end inline */
return hdr;
}
char *
sRetransmit(wmo_header_t *hdr)
{
static char buf[4];
int seq = hdr->retrans_seq;
if(hdr->retransmit == PIE)
{
if(seq < 0 || seq > 675) /* 26 * 26 -1 */
seq = 675;
}
else if(seq < 'A' || seq > 'Z')
seq = 0;
switch ( hdr->retransmit ) {
case RTD :
if(seq)
{
sprintf(buf,"RR%c", seq);
return buf;
}
/* else */
return "RTD";
case COR :
if(seq)
{
sprintf(buf,"CC%c", seq);
return buf;
}
/* else */
return "COR";
case AMD :
if(seq)
{
sprintf(buf,"AA%c", seq);
return buf;
}
/* else */
return "AMD";
case PIE :
{
const char c1 = seq/26 + 'A';
const char c2 = seq%26 + 'A';
sprintf(buf,"P%c%c", c1, c2);
return buf;
}
}
/* default */
return NULL;
}
char *
s_wmo_header(wmo_header_t *hdr)
{
#ifndef KEYSIZE
#define KEYSIZE 255
#endif
static char sp[KEYSIZE+1];
char *cp = sp;
(void) memset(sp,0,sizeof(sp));
sprintf(cp,
"%2s%2s%02d %4s",
hdr->TT, hdr->AA, hdr->ii, hdr->CCCC);
cp += 11;
if(hdr->time != NULL)
{
sprintf(cp,
" %02d%02d%02d",
hdr->time->mday, hdr->time->hour,
hdr->time->min );
}
else
{
sprintf(cp, " DDHHMM");
}
cp += 7;
if(hdr->retransmit != ORIGINAL)
sprintf(cp, " %s", sRetransmit(hdr));
if(hdr->PIL[0] != NULL)
strcat(sp,hdr->PIL);
return sp;
}
#if 0
int
fprint_wmo_header(FILE *fp, const wmo_header_t *hdr)
{
#if 1
fprintf(fp,
"%2s%2s%02d %4s",
hdr->TT, hdr->AA, hdr->ii, hdr->CCCC);
if(hdr->time != NULL)
{
fprintf(fp,
" %02d%02d%02d",
hdr->time->mday, hdr->time->hour,
hdr->time->min );
}
else
{
fprintf(fp, " DDHHMM");
}
if(hdr->retransmit != ORIGINAL)
fprintf(fp, " %s", sRetransmit(hdr));
#else
fputs( s_wmo_header(hdr) , fp );
#endif
return ferror(fp);
}
#endif
#if USED
static void
clear_wmo_header(wmo_header_t *hdr)
{
hdr->TT[0] = hdr->AA[0] = hdr->ii =
hdr->CCCC[0] = hdr->retransmit = hdr->retrans_seq = 0;
free_dtime(hdr->time);
hdr->time = NULL;
}
#endif /* USED */
char *
sMessage_type(message_type_t type)
{
switch(type) {
case SYNOP : return "SYNOP";
case SHIP : return "SHIP";
case METAR : return "METAR";
case SPECI : return "SPECI";
case MESSAGE_TYPE_UNKNOWN : return "MESSAGE_TYPE_UNKNOWN";
}
/* default */
return NULL;
}
message_type_t
decode_type(char *tt, char *aa, char *pil)
{
if( pil[0] == '/' && pil[1] == 'p' && pil[2] == 'M' && pil[3] == 'T')
{
if( pil[4] == 'R' || pil[4] == 'T')
{
uerror("HDR + PIL: %s%s %s", tt, aa, pil ) ;
return METAR;
}
}
if( tt[0] == 'S' || tt[0] == 'F')
{
/* Table B1 */
switch( tt[1] ) {
case 'I' :
case 'M' :
case 'N' :
if(aa[0] == 'W' || aa[0] == 'V' )
{
/* table C2 */
switch( aa[1] ) {
case 'A' :
case 'B' :
case 'C' :
case 'D' :
case 'E' :
case 'F' :
case 'J' :
case 'X' :
return SHIP;
}
/* else */
}
/* else */
return SYNOP;
case 'A' :
return METAR; /* might be an 'sao' */
case 'X' :
if(*aa == 'U' && *(aa +1) == 'S')
return METAR; /* 'sao' */
break;
case 'P' :
return SPECI;
}
/* else */
}
/* else */
return MESSAGE_TYPE_UNKNOWN;
}
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: wmo_header.h,v 1.11 1999/06/03 21:13:26 rkambic Exp $ */
#ifndef _WMO_HEADER_H_
#define _WMO_HEADER_H_
#include "xbuf.h"
#include "dtime.h"
typedef enum {
MESSAGE_TYPE_UNKNOWN = 0 ,
SYNOP, /* FM 12 */
SHIP, /* FM 13 */
METAR, /* FM 15 or SAO!! */
SPECI /* FM 16 */
} message_type_t;
typedef enum {
ORIGINAL = 0,
RTD , /* delayed */
COR , /* correction */
AMD , /* amended */
PIE /* pieces */
} retransmit_t;
/* abbreviated heading, 2.3.2 */
typedef struct {
char TT[3]; /* Tsub1Tsub2 : Data type and/or form */
char AA[3]; /* Asub1Asub2 : Geograph. and/or time */
int ii;
char CCCC[5]; /* station of origin or compilation */
char PIL[10];
dtime *time;
retransmit_t retransmit; /* BBB delay, correction or amendment ind */
int retrans_seq; /* the sequence from BBB */
} wmo_header_t;
extern void free_wmo_header(wmo_header_t *hdr);
extern wmo_header_t *new_wmo_header(char **line);
extern wmo_header_t *get_wmo_header(xbuf *buf, wmo_header_t *hdr);
extern char *
s_wmo_header(wmo_header_t *hdr);
/*
extern int
fprint_wmo_header(FILE *fp , const wmo_header_t *hdr);
*/
extern char *sRetransmit(wmo_header_t *hdr );
extern char *sMessage_type(message_type_t type);
extern message_type_t decode_type(char *tt, char *aa, char *PIL);
#endif /* _WMO_HEADER_H_ */