[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: FXUS43 continued
- Subject: Re: FXUS43 continued
- Date: Mon, 6 Mar 2000 11:10:43 -0700 (MST)
Greg,
I didn't sent the lastest code, here's the code that will work. Use these
files in the src/pqsurf directory.
Robb...
Output:
zero.unidata.ucar.edu.rkambic> m 00030617.wmo
METAR^M
KHOX 1720Z 061719Z AUTO 20012KT 16/00 A2982 RMK A01 T0161 PCPN 000=^M
METAR^M
KCBK 1729Z 061726Z AUTO 18016G26KT 16/M01 A2979 RMK A01 T0161 PCPN 000=^M
METAR^M
KCBK 1744Z 061743Z AUTO 20017G24KT 17/01 A2979 RMK A01 T0167 PCPN 000=^M
METAR^M
KHOX 1750Z 061749Z AUTO 18012KT 18/M02 A2983 RMK A01 T0178 PCPN 000=^M
METAR^M
KTRB 1756Z 061752Z 22016KT 18/M02 RMK AO1 T01831017=^M
METAR^M
KYMA 1757Z 061753Z 23008KT 14/M06 RMK AO1 T01441061=^M
METAR^M
KGNL 1758Z 061758Z AUTO 27008KT 18/01 A2988 RMK A01 T01800010 PCPN000=^M
METAR^M
KNRN 1758Z 061758Z AUTO 16010KT 17/00 A2986 RMK A01 T017000000 PCPN000=^M
METAR^M
KSTR 1758Z 061758Z AUTO 14012KT 16/M01 A2978 RMK A01 T01601010 PCPN000=^M
METAR^M
KCBK 1759Z 061758Z AUTO 20019KT 17/01 A2980 RMK A01 T0172 PCPN 000=^M
On Fri, 3 Mar 2000, Greg Thompson wrote:
>
> Robb,
>
> thanks so much for helping with the FXUS43 issue. I compiled your latest
> code into our version of pqsurf but I'm still not getting any data with
> those headers into my "metar_gdb" files. Here's what I'm using in the
> ldmd.conf file:
>
> exec "pqsurf -l /data/ldm/logs/pqsurf.log -f WMO -p ^(S[AP]|FXUS43) -Q
> /data/ldm...
>
> and here's the important line from pqsurf.conf:
>
> WMO ^metar (....) ([0-3][0-9])([0-2][0-9]) DBFILE /data/ldm/ddp........
>
> Also, there are new METARs coming out of Cheyenne, WY and look like:
>
> 344 ^M^M
> SAUS45 KCYS 032002^M^M
> MTRCTD^M^M
> METAR KCTD 031945Z AUTO 23010G13KT 07/M06 RMK AO1 RH% = 42 ^M^M
> ^M^M
> ^M^M
> ^M^M
> ^C
>
> Even though these clearly have "SA" as their starting header, they are
> also not appearing in my "metar_gdb" files. I'd very much like to get
> these into our system appropriately. Can you help?
>
> Thanks!
>
> --
> +--------------------------------------------------------------+
> | Greg Thompson http://www.rap.ucar.edu/staff/gthompsn/ |
> | Research Applications Program |
> | (303) 497-2805 National Center for Atmospheric Research |
> | (fax) -8401 P.O. Box 3000 Boulder, CO 80307-3000 |
> +--------------------------------------------------------------+
>
===============================================================================
Robb Kambic Unidata Program Center
Software Engineer III Univ. Corp for Atmospheric Research
address@hidden WWW: http://www.unidata.ucar.edu/
===============================================================================
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: wmo_header.h,v 1.11 1999/06/03 21:13:26 rkambic Exp $ */
#ifndef _WMO_HEADER_H_
#define _WMO_HEADER_H_
#include "xbuf.h"
#include "dtime.h"
typedef enum {
MESSAGE_TYPE_UNKNOWN = 0 ,
SYNOP, /* FM 12 */
SHIP, /* FM 13 */
METAR, /* FM 15 or SAO!! */
SPECI /* FM 16 */
} message_type_t;
typedef enum {
ORIGINAL = 0,
RTD , /* delayed */
COR , /* correction */
AMD , /* amended */
PIE /* pieces */
} retransmit_t;
/* abbreviated heading, 2.3.2 */
typedef struct {
char TT[3]; /* Tsub1Tsub2 : Data type and/or form */
char AA[3]; /* Asub1Asub2 : Geograph. and/or time */
int ii;
char CCCC[5]; /* station of origin or compilation */
char PIL[10];
dtime *time;
retransmit_t retransmit; /* BBB delay, correction or amendment ind */
int retrans_seq; /* the sequence from BBB */
} wmo_header_t;
extern void free_wmo_header(wmo_header_t *hdr);
extern wmo_header_t *new_wmo_header(char **line);
extern wmo_header_t *get_wmo_header(xbuf *buf, wmo_header_t *hdr);
extern char *
s_wmo_header(wmo_header_t *hdr);
/*
extern int
fprint_wmo_header(FILE *fp , const wmo_header_t *hdr);
*/
extern char *sRetransmit(wmo_header_t *hdr );
extern char *sMessage_type(message_type_t type);
extern message_type_t decode_type(char *tt, char *aa, char *PIL);
#endif /* _WMO_HEADER_H_ */
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: wmo_header.c,v 1.30 1999/06/21 22:15:21 rkambic Exp $ */
#include <stdio.h>
#include <string.h>
#include "wmo_header.h"
#include "alloc.h"
#include "tokens.h"
#include "xbuf.h"
int usePil=0;
void
free_wmo_header(wmo_header_t *hdr)
{
if(hdr == NULL) return;
free_dtime(hdr->time);
free(hdr);
}
static void
clear_wmo_header(wmo_header_t *hdr)
{
hdr->TT[0] = hdr->AA[0] = hdr->ii =
hdr->CCCC[0] = hdr->PIL[0] = 0;
hdr->retransmit = ORIGINAL;
hdr->retrans_seq = MESSAGE_TYPE_UNKNOWN;
clear_dtime(hdr->time);
}
wmo_header_t *
get_wmo_header(xbuf *buf, wmo_header_t *hdr)
{
clear_wmo_header(hdr);
if( get_wstr(buf, hdr->TT, 2) == EOB ) return NULL;
if( get_wstr(buf, hdr->AA, 2) == EOB ) return NULL;
if( get_wnum(buf, &hdr->ii, 2) == EOB ) return NULL;
if(hdr->ii == -1) hdr->ii = 0;
if( get_wstr(buf, hdr->CCCC, 4) == EOB ) return NULL;
{
int YY, GG, gg;
if( get_wnum(buf, &YY, 2) == EOB ) return NULL;
if( get_wnum(buf, &GG, 2) == EOB ) return NULL;
if( get_wnum(buf, &gg, 2) == EOB ) return NULL;
/* uses current time on parse errors */
hdr->time = set_dtime(hdr->time, YY,GG,gg);
}
/* decode BBB feild */
{ /* inline */
char line[16];
char pilstr[16];
int allnum, nonalph, ich;
if( get_wline(buf, line, sizeof(line)) == EOB ) return NULL;
{ /* inline inner */
const char *cp = &line[0];
const char *const end = &line[strlen(line)];
/* N.B. twisted flow here */
for(cp = &line[0]; *cp != 0 && *cp != CR && cp + 2 < end; cp++)
{
switch( cp[0] ) {
case 'R' :
if(cp[1] == 'R')
{
hdr->retransmit = RTD;
hdr->retrans_seq = cp[2];
}
else if(cp[1] == 'T')
{
hdr->retransmit = RTD;
}
break;
case 'C' :
if(cp[1] == 'C')
{
hdr->retransmit = COR;
hdr->retrans_seq = cp[2];
}
else if(cp[1] == 'O')
{
hdr->retransmit = COR;
}
break;
case 'A' :
if(cp[1] == 'A')
{
hdr->retransmit = AMD;
hdr->retrans_seq = cp[2];
}
else if(cp[1] == 'M')
{
hdr->retransmit = AMD;
}
break;
case 'P' :
hdr->retransmit = PIE;
hdr->retrans_seq = (cp[1] - 'A') * 26
+ (cp[2] - 'A');
goto done;
case 0 :
break;
default :
continue; /* loop, look some more */
}
/* only arrrive here if we got an acceptable BBB string */
if(hdr->retrans_seq == 0)
{
int tmp;
/* see if they used old style, eg RTD01 */
cp += 3;
tmp = atoi(cp);
/* Attachment II-14 */
if(tmp >= 0 && tmp < 24 )
hdr->retrans_seq = tmp + 'A';
else
hdr->retrans_seq = 'Y';
}
done:
break;
} /* end for */
} /* end inline inner */
if((usePil == 1) && ( get_wstr(buf, pilstr, sizeof(pilstr)) != EOB ))
{
if((pilstr[0] != NULL) && ( get_wline(buf, line, sizeof(line)) !=
EOB ))
{
if((line[0] == NULL)&&(strlen(pilstr) > 4)&&(strlen(pilstr) < 7))
{
nonalph = 0; allnum = 1;
for(ich = 0;ich < strlen(pilstr);ich++)
{
if(isalnum(pilstr[ich]) == 0) nonalph = 1;
if(isdigit(pilstr[ich]) == 0) allnum = 0;
}
if((nonalph == 0)&&(allnum == 0)) sprintf(hdr->PIL,"
/p%s\0",pilstr);
}
else if((line[0] == NULL)&&(strncmp(pilstr,"^NMC",4) == 0)&&
(strlen(pilstr+4) > 4)&&(strlen(pilstr+4) < 7))
sprintf(hdr->PIL," /p%s\0",pilstr+4);
}
}
} /* end inline */
return hdr;
}
char *
sRetransmit(wmo_header_t *hdr)
{
static char buf[4];
int seq = hdr->retrans_seq;
if(hdr->retransmit == PIE)
{
if(seq < 0 || seq > 675) /* 26 * 26 -1 */
seq = 675;
}
else if(seq < 'A' || seq > 'Z')
seq = 0;
switch ( hdr->retransmit ) {
case RTD :
if(seq)
{
sprintf(buf,"RR%c", seq);
return buf;
}
/* else */
return "RTD";
case COR :
if(seq)
{
sprintf(buf,"CC%c", seq);
return buf;
}
/* else */
return "COR";
case AMD :
if(seq)
{
sprintf(buf,"AA%c", seq);
return buf;
}
/* else */
return "AMD";
case PIE :
{
const char c1 = seq/26 + 'A';
const char c2 = seq%26 + 'A';
sprintf(buf,"P%c%c", c1, c2);
return buf;
}
}
/* default */
return NULL;
}
char *
s_wmo_header(wmo_header_t *hdr)
{
#ifndef KEYSIZE
#define KEYSIZE 255
#endif
static char sp[KEYSIZE+1];
char *cp = sp;
(void) memset(sp,0,sizeof(sp));
sprintf(cp,
"%2s%2s%02d %4s",
hdr->TT, hdr->AA, hdr->ii, hdr->CCCC);
cp += 11;
if(hdr->time != NULL)
{
sprintf(cp,
" %02d%02d%02d",
hdr->time->mday, hdr->time->hour,
hdr->time->min );
}
else
{
sprintf(cp, " DDHHMM");
}
cp += 7;
if(hdr->retransmit != ORIGINAL)
sprintf(cp, " %s", sRetransmit(hdr));
if(hdr->PIL[0] != NULL)
strcat(sp,hdr->PIL);
return sp;
}
#if 0
int
fprint_wmo_header(FILE *fp, const wmo_header_t *hdr)
{
#if 1
fprintf(fp,
"%2s%2s%02d %4s",
hdr->TT, hdr->AA, hdr->ii, hdr->CCCC);
if(hdr->time != NULL)
{
fprintf(fp,
" %02d%02d%02d",
hdr->time->mday, hdr->time->hour,
hdr->time->min );
}
else
{
fprintf(fp, " DDHHMM");
}
if(hdr->retransmit != ORIGINAL)
fprintf(fp, " %s", sRetransmit(hdr));
#else
fputs( s_wmo_header(hdr) , fp );
#endif
return ferror(fp);
}
#endif
#if USED
static void
clear_wmo_header(wmo_header_t *hdr)
{
hdr->TT[0] = hdr->AA[0] = hdr->ii =
hdr->CCCC[0] = hdr->retransmit = hdr->retrans_seq = 0;
free_dtime(hdr->time);
hdr->time = NULL;
}
#endif /* USED */
char *
sMessage_type(message_type_t type)
{
switch(type) {
case SYNOP : return "SYNOP";
case SHIP : return "SHIP";
case METAR : return "METAR";
case SPECI : return "SPECI";
case MESSAGE_TYPE_UNKNOWN : return "MESSAGE_TYPE_UNKNOWN";
}
/* default */
return NULL;
}
message_type_t
decode_type(char *tt, char *aa, char *pil)
{
if( pil[0] == '/' && pil[1] == 'p' && pil[2] == 'M' && pil[3] == 'T')
{
if( pil[4] == 'R' || pil[4] == 'T')
{
uerror("HDR + PIL: %s%s %s", tt, aa, pil ) ;
return METAR;
}
}
if( tt[0] == 'S' || tt[0] == 'F')
{
/* Table B1 */
switch( tt[1] ) {
case 'I' :
case 'M' :
case 'N' :
if(aa[0] == 'W' || aa[0] == 'V' )
{
/* table C2 */
switch( aa[1] ) {
case 'A' :
case 'B' :
case 'C' :
case 'D' :
case 'E' :
case 'F' :
case 'J' :
case 'X' :
return SHIP;
}
/* else */
}
/* else */
return SYNOP;
case 'A' :
return METAR; /* might be an 'sao' */
case 'X' :
if(*aa == 'U' && *(aa +1) == 'S')
return METAR; /* 'sao' */
break;
case 'P' :
return SPECI;
}
/* else */
}
/* else */
return MESSAGE_TYPE_UNKNOWN;
}
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: surf_split.c,v 1.30 1999/12/02 23:21:26 rkambic Exp $ */
#include <ldmconfig.h>
#include <stdio.h>
#include <ctype.h>
#include <string.h>
#include "ldm.h"
#include "ulog.h"
#include "wmo_header.h"
#include "tokens.h"
#include "xbuf.h"
#include "surface.h" /* wind_units_t, CALL_SIGN_LEN */
#include "md5.h"
static double md5ctx[16]; /* 88 would be big enough */
static MD5_CTX *md5ctxp = (MD5_CTX *)md5ctx;
static int
get_yygg(xbuf *buf, dtime *time)
{
int status;
int YY = -1;
int GG = -1;
if((status = dget_wnum(buf, &YY, 2)) < 0) return status;
if((status = dget_num(buf, &GG, 2)) < 0) return status;
set_dtime(time, YY, GG, 0);
return status;
}
/* For METAR, check if the HHMMZ time string is present */
static int
whas_yyggZ(xbuf *buf)
{
int ch;
/* skip white space */
do{
ch = nextc(buf);
}while((isascii(ch) && !isgraph(ch)));
unnextc(buf,ch);
if(buf->cnt < 5)
return 0; /* not enough characters */
if(buf->get[4] != 'Z'
|| !isdigit(buf->get[3])
|| !isdigit(buf->get[2])
|| !isdigit(buf->get[1])
|| !isdigit(buf->get[0]))
return 0;
return 1; /* passed */
}
/* For METAR, check if "NIL" */
static int
has_NIL(xbuf *buf)
{
char nilstr[] = "NIL";
char *np = (char *)&buf->base[buf->bufsiz - 1 - (sizeof(nilstr) -1 -1)];
if(strncmp(np, nilstr, sizeof(nilstr) -1) == 0)
return 1;
return 0;
}
/* For METAR, get the bulletin time, if possible */
static void
get_wyyggZ(xbuf *buf, dtime *time)
{
int ch;
if(!whas_yyggZ(buf))
return;
(void)get_yygg(buf, time);
ch = nextc(buf); /* eat the 'Z' */
return;
}
/*
* Takes a WMO format product which is a
* SAO, SYNOP, SHIP, METAR, or SPECI message, splits it into
* individual observations. The observations are each encapsulated in a
* new product which inherits most of its description from the
* original product.
* The new product pkey is derived from the observation type
* and has the following form:
*
* SAO - "sao tt ccc ddhhmm"
* where:
* tt is SA, SP or RS
* ccc is the station ID like SFO, LXV, etc
* ddhhmm is the time stamp.
*
* SYNOP - "aaxx nnnnn ddhhmm"
* where:
* nnnnn is the WMO station id (5 digit number)
*
* SHIP - "bbxx c* ddhhmm"
* where:
* c* is the call sign
*
* METAR - "metar cccc ddhhmm"
* where:
* cccc is the call sign
*
* SPECI - "speci cccc ddhhmm"
*
* The new product sequence number is original sequence number times 1000
* plus the sequence of the individual observation within the product.
*
* 'doit' is called on each of the new products. It is presumed
* this function return zero upon success.
*
* Returns the number of successful calls to 'doit', eg, the
* number of splits. Returns -1 on error.
*/
int
surf_split(const prod_info *infop, const void *datap,
int (*doit)(const prod_info *, const void *))
{
int action = -1;
wmo_header_t hdr;
message_type_t mtype;
dtime dt;
xbuf buf[1];
unsigned char dbuf[8192]; /* TODO */
int nsplit = 0;
enum {
SURFACE_BOGUS ,
AAXX,
US_AAXX,
BBXX,
SAO,
sMETAR,
sSPECI
} subtype = SURFACE_BOGUS;
hdr.time = &dt;
if(infop->sz > sizeof(dbuf))
return -1; /* TODO: too big */
memcpy(dbuf, datap, infop->sz);
if( cbuftoxbuf(buf, (unsigned char *)dbuf,
infop->sz) == NULL)
return -1;
skipline(buf, 4); /* SOH */
skipline(buf, 12); /* start */
if( get_wmo_header(buf, &hdr) == NULL)
{
return -1;
}
#if DEBUG
fputs("\t", stderr);
fprint_wmo_header(stderr, &hdr);
fputs("\n", stderr);
#endif
mtype = decode_type(hdr.TT,hdr.AA,hdr.PIL);
/* #### */
{
char cbuf[8];
int digit;
dtime time;
wind_units_t wind_units = WIND_UNAVAIL;
time = *hdr.time; /* default the ob time to the time in the header */
/* delve into section 0 */
switch(mtype) {
case SYNOP :
if(get_wstr(buf, cbuf, 1) < 0 ) return -1;
if(cbuf[0] == 'A')
{
subtype = AAXX;
if(get_str(buf, &cbuf[1], 3) < 0 ) return -1;
if( cbuf[3] != 'X' )
{
/* punt */
uerror("surface_split: Unknown type: %s\n",
cbuf);
return 0;
}
if(get_yygg(buf, &time) < 0 ) return -1; /* YYGG */
if(dget_num(buf, &digit, 1) < 0 ) return -1; /* isubw */
if(digit >= 0 && digit <= 4) wind_units =
(wind_units_t)digit;
}
else if(isascii(cbuf[0]) && isdigit(cbuf[0])) /* US Stations
7NNNN */
{
unnextc(buf,cbuf[0]);
subtype = US_AAXX;
/*
* Some US reports leave off AAXX YYGGisubw, so we use
the
* time from the wmo header.
*/
wind_units = KNOTS;
}
else
{
unnextc(buf,cbuf[0]);
return 0; /* ?? */
}
break;
case SHIP :
if(get_wstr(buf, cbuf, 4) < 0 ) return -1;
if(cbuf[0] == 'B')
{
if( cbuf[3] != 'X' )
{
/* punt */
uerror("surface_split: Unknown type: %s\n",
cbuf);
return 0;
}
subtype = BBXX;
/* get time below */
}
else
{
unnextc(buf,cbuf[0]);
return 0;
}
break;
case METAR :
if(whasSTR(buf, "METAR"))
{
subtype = sMETAR;
get_wyyggZ(buf, &time);
}
else if(hdr.PIL[0] == 'M' && hdr.PIL[1] == 'T')
{
uerror("HDR + PIL: %s%s %s", hdr.TT, hdr.AA,
hdr.PIL ) ;
/* skip 6 char PIL */
if(get_wstr(buf, cbuf, CALL_SIGN_LEN) < 0)
return 0;
subtype = sMETAR;
get_wyyggZ(buf, &time);
}
else
subtype = SAO; /* may actually be a METAR, check below
*/
break;
case SPECI :
if(whasSTR(buf, "SPECI"))
{
subtype = sSPECI;
get_wyyggZ(buf, &time);
}
break;
default :
uerror("surface_split: Can't handle %s",
sMessage_type(mtype) );
uerror("HDR + PIL: %s%s %s", hdr.TT, hdr.AA, hdr.PIL ) ;
return -1;
}
{ /* while block */
static char newkey[KEYSIZE];
xbuf subbuf[1];
prod_info newinfo = *infop;
#define MAX_SURF_LEN 511
#undef MIN
#define MIN(a,b) ((a) <= (b) ? (a) : (b))
char pbuf[MAX_SURF_LEN + 1];
int l1, l2;
static char ident[CALL_SIGN_LEN+1];
static char type[4];
u_int subseq = infop->seqno * 1000;
unsigned char *pp;
while( get_weqxbuf(buf, subbuf) > 0 )
{
(void)memset(newkey,0,KEYSIZE);
(void)memset(pbuf,0,MAX_SURF_LEN + 1);
(void)memset(ident,0,CALL_SIGN_LEN+1);
pp = subbuf->base;
switch(subtype) {
case AAXX :
case US_AAXX :
strcpy(newkey, "aaxx ");
strcpy(pbuf, "AAXX");
sprintf(&pbuf[strlen(pbuf)], " %02d%02d%1d\r\r\n",
time.mday, time.hour, (int)wind_units);
/* WMO station no. */
if(get_wstr(subbuf, ident, 5) < 0)
continue;
strcat(newkey, ident);
break;
case BBXX :
strcpy(newkey, "bbxx ");
strcpy(pbuf, "BBXX\r\r\n");
/* call sign */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
strcat(newkey, ident);
if(get_yygg(subbuf, &time) < 0) continue; /* YYGG */
break;
case sSPECI :
/* call sign */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
if(strcmp(ident, "SPECI") == 0)
{
/* They package each ob with a tag */
pp = (subbuf->get +1);
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
}
if(!whas_yyggZ(subbuf))
{
/* Have to insert the date */
sprintf(pbuf, "SPECI\r\r\n%s %02d%02dZ ",
ident, time.hour, time.min);
pp = subbuf->get;
}
else
strcpy(pbuf, "SPECI\r\r\n");
strcpy(newkey, "speci ");
strcat(newkey, ident);
break;
case sMETAR :
if(has_NIL(subbuf))
continue;
/* call sign */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
if(strcmp(ident, "METAR") == 0)
{
/* They package each ob with a tag */
pp = (subbuf->get +1);
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
}
if(!whas_yyggZ(subbuf))
{
/* Have to insert the date */
sprintf(pbuf, "METAR\r\r\n%s %02d%02dZ ",
ident, time.hour, time.min);
pp = subbuf->get;
}
else
strcpy(pbuf, "METAR\r\r\n");
strcpy(newkey, "metar ");
strcat(newkey, ident);
break;
case SAO :
/* call sign */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
if(hdr.AA[0] == 'U' && hdr.AA[1] == 'S'
&& strlen(ident) == 6)
{
/* skip 6 char US "AFOS code" */
if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0)
continue;
}
/* SA, SP, RS, USP or XP */
if(get_wstr(subbuf, type, 3) < 0)
continue;
if((type[0] == 'S'
&& (type[1] == 'A' || type[1] == 'P'))
|| (type[0] == 'R' && type[1] == 'S')
|| (type[0] == 'U' && type[1] == 'S'
&& type[2] == 'P')
|| (type[0] == 'X' && type[1] == 'P')
|| (type[0] == 'T' &&
(type[1] == 'A' || type[1] == 'S'))
)
{
strcpy(newkey, "sao ");
strcat(newkey, type);
strcat(newkey, " ");
strcat(newkey, ident);
}
else if(isdigit(type[0]) && isdigit(type[1]))
{
/* it is a METAR really */
subtype = sMETAR;
strcpy(newkey, "metar ");
strcat(newkey, ident);
strcpy(pbuf, "METAR\r\r\n");
}
else
continue; /* don't know what it is, "NIL=" */
break;
}
/* safety net */
if(strlen(ident) == 0)
{
continue;
}
/* else */
sprintf(&newkey[strlen(newkey)], " %02d%02d%02d",
time.mday, time.hour, time.min);
if(hdr.retransmit != ORIGINAL)
sprintf(&newkey[strlen(newkey)], " %s",
sRetransmit(&hdr));
newinfo.ident = newkey;
newinfo.seqno = ++subseq;
l1 = strlen(pbuf);
l2 = MIN(MAX_SURF_LEN - l1 - 4, subbuf->bufsiz - (pp -
subbuf->base));
/* N.B.: silent truncation */
strncat(pbuf, (char *)pp, l2 );
strcat(pbuf,"=\r\r\n");
newinfo.sz = l1 + l2 + 4;
#if DEBUG
fprintf(stderr,"\t\t%s\n", newinfo.ident);
#endif
#if PRINT
{
char *cp = pbuf;
char *end = &cp[newinfo.sz];
while(cp < end)
{
putc(*cp, stderr);
cp++;
}
}
putc('\n', stderr);
#endif
MD5Init(md5ctxp);
MD5Update(md5ctxp, (const unsigned char *)pbuf, newinfo.sz);
MD5Final(newinfo.signature, md5ctxp);
/*
* process the single ob in the requested fashion
*/
if((*doit)(&newinfo, pbuf) == 0)
nsplit++;
} /* end while */
#if PRINT
putc('\n', stderr);
#endif
} /* end while block */
} /* end #### block */
return nsplit;
}
/*
* Copyright 1993, University Corporation for Atmospheric Research
* See ../COPYRIGHT file for copying and redistribution conditions.
*/
/* $Id: pqsurf.c,v 1.51 1999/04/02 23:16:35 davis Exp $ */
/*
*
*/
/*
* Need to create a queue before running this:
* pqcreate -c -s 2M -S 13762 /usr/local/ldm/data/pqsurf.pq
*/
#include <ldmconfig.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rpc/rpc.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <sys/wait.h>
#include <assert.h>
#include <regex.h>
#include "ldm.h"
#include "atofeedt.h"
#include "ldmprint.h"
#include "ulog.h"
#include "pq.h"
#include "paths.h"
#include "surface.h"
#ifdef NO_ATEXIT
#include "atexit.h"
#endif
extern int usePil; /* 1/0 flag to signal use of AFOS like pil identifier */
#ifndef DEFAULT_INTERVAL
#define DEFAULT_INTERVAL 15
#endif
static volatile int done = 0;
static volatile int intr = 0;
static volatile int stats_req = 0;
#ifndef DEFAULT_PATTERN
#define DEFAULT_PATTERN "^S[AIMNP]"
#endif
#ifndef DEFAULT_FEEDTYPE
#define DEFAULT_FEEDTYPE (IDS|DDS)
#endif
static const char *pqfname = DEFAULT_QUEUE;
static pqueue *pq = NULL;
/* set in paths.h by configure */
#ifndef DEFAULT_SURF_OUTQUEUE
#define DEFAULT_SURF_OUTQUEUE "/usr/local/ldm/data/pqsurf.pq"
#endif
static char *opqfname = DEFAULT_SURF_OUTQUEUE;
static pqueue *opq = NULL;
/* set in paths.h by configure */
#ifndef DEFAULT_SURF_CONFFILE
#define DEFAULT_SURF_CONFFILE "/usr/local/ldm/etc/pqsurf.conf"
#endif
/* set in paths.h by configure */
#ifndef DEFAULT_SURF_DATADIR
#ifndef DEFAULT_DATADIR
#define DEFAULT_SURF_DATADIR "/usr/local/ldm"
#else
#define DEFAULT_SURF_DATADIR DEFAULT_DATADIR
#endif
#endif
#ifndef DEFAULT_PIPE_TIMEO
#define DEFAULT_PIPE_TIMEO 60
#endif
#ifndef DEFAULT_AGE
#define DEFAULT_AGE (1. + (double)(DEFAULT_INTERVAL)/3600.)
#endif
static pid_t act_pid;
/*
* Set to non-root privilege if possible.
* Do it in such a way that it is safe to fork.
* TODO: this is duplicated from ../server/priv.c
*/
static void
endpriv(void)
{
const uid_t euid = geteuid();
const uid_t uid = getuid();
/* if either euid or uid is unprivileged, use it */
if(euid > 0)
setuid(euid);
else if(uid > 0)
setuid(uid);
/* else warn??? or set to nobody??? */
}
static pid_t
run_child(int argc, char *argv[])
{
pid_t pid;
if(ulogIsDebug())
{
char command[1024];
char *cp = command;
int ii = 0;
command[0] = 0;
while (ii < argc)
{
strcpy(cp, argv[ii]);
cp += strlen(argv[ii]);
if(++ii == argc)
break;
*cp++ = ' ';
*cp = 0;
}
udebug("exec'ing: \"%s\"", command);
}
pid = fork();
if(pid == -1)
{
serror("run_child: fork failed");
return pid;
}
if(pid == 0)
{ /* child */
(void)signal(SIGCHLD, SIG_DFL);
(void)signal(SIGTERM, SIG_DFL);
/* keep same descriptors as parent */
/* don't let child get real privilege */
endpriv();
(void) execvp(argv[0], &argv[0]);
serror("run_child: execvp: %s", argv[0]);
_exit(127);
}
/* else, parent */
return pid;
}
static int nprods = 0;
static int nsplit = 0;
static int ndups = 0;
static void
dump_stats(void)
{
unotice("Number of products %d", nprods);
unotice("Number of observations %d", nsplit);
unotice("Number of dups %d", ndups);
}
/* defined in surf_split.c */
extern int surf_split(const prod_info *infop, const void *datap,
int (*doit)(const prod_info *, const void *));
static int
doOne(const prod_info *infop, const void *datap)
{
struct product prod;
int status = ENOERR;
if(ulogIsDebug())
udebug("%s", s_prod_info(NULL, 0, infop, 1));
prod.info = *infop;
prod.data = (void *)datap; /* cast away const */
nsplit++; /* ?? Do it here on only on success ?? */
status = pq_insertNoSig(opq, &prod);
if(status == ENOERR)
{
return status; /* Normal return */
}
/* else */
if(status == PQUEUE_DUP)
{
ndups++;
if(ulogIsVerbose())
uinfo("Product already in queue: %s",
s_prod_info(NULL, 0, &prod.info,
ulogIsDebug()));
return status;
}
/* else, error */
uerror("pq_insert: %s\n", strerror(status));
return status;
}
/*
*/
static int
split_prod(const prod_info *infop, const void *datap,
void *xprod, size_t size, void *vp)
{
size_t *nsp = (size_t *)vp;
int ns;
if(ulogIsVerbose())
uinfo("%s", s_prod_info(NULL, 0, infop, ulogIsDebug()));
ns = surf_split(infop, datap, doOne);
nprods++;
(void)kill(SIGCONT, act_pid);
if(nsp != NULL && ns >= 0)
*nsp = (size_t)ns;
return 0;
}
static void
usage(const char *av0) /* id string */
{
(void)fprintf(stderr,
"Usage: %s [options] [confilename]\t\nOptions:\n",
av0);
(void)fprintf(stderr,
"\t-v Verbose, log each match (SIGUSR2 toggles)\n");
(void)fprintf(stderr,
"\t-x Debug mode\n");
(void)fprintf(stderr,
"\t-l logfile Send log info to file (default uses
syslogd)\n");
(void)fprintf(stderr,
"\t-d datadir cd to \"datadir\" before interpreting filenames
in\n");
(void)fprintf(stderr,
"\t conffile (default %s)\n",
DEFAULT_SURF_DATADIR);
(void)fprintf(stderr,
"\t-q queue default \"%s\"\n", DEFAULT_QUEUE);
(void)fprintf(stderr,
"\t-p pattern Interested in products matching \"pattern\"
(default \"%s\")\n", DEFAULT_PATTERN);
(void)fprintf(stderr,
"\t-f feedtype Interested in products from feed \"feedtype\"
(default %s)\n", s_feedtypet(DEFAULT_FEEDTYPE));
(void)fprintf(stderr,
"\t-i interval loop, polling each \"interval\" seconds
(default %d)\n", DEFAULT_INTERVAL);
(void)fprintf(stderr,
"\t-a age Expire products older than \"age\" hours
(default %.4f)\n", DEFAULT_AGE);
(void)fprintf(stderr,
"\t-t timeo set write timeo for PIPE subprocs to \"timeo\"
secs (default %d)\n", DEFAULT_PIPE_TIMEO);
(void)fprintf(stderr,
"\t-o offset the oldest product we will consider is
\"offset\" secs before now (default: most recent in output queue)\n");
(void)fprintf(stderr,
"\t-Q outQueue default \"%s\"\n", DEFAULT_SURF_OUTQUEUE);
(void)fprintf(stderr,
"\t(default conffilename is %s)\n",
DEFAULT_SURF_CONFFILE);
exit(1);
}
static pid_t
reap_act(int options)
{
pid_t wpid = 0;
int status = 0;
#ifndef NO_WAITPID
wpid = waitpid(act_pid, &status, options);
#else
if(options == 0)
wpid = wait(&status);
/* customize here for older systems, use wait3 or whatever */
#endif
if(wpid == -1)
{
if(!(errno == ECHILD && act_pid == -1))
{
/* Only complain if relevant */
serror("waitpid");
}
return -1;
}
/* else */
if(wpid != 0)
{
/* tag_pid_entry(wpid); */
#ifndef NO_WAITPID
if(WIFSTOPPED(status))
{
unotice("child %d stopped by signal %d",
wpid, WSTOPSIG(status));
}
else if(WIFSIGNALED(status))
{
unotice("child %d terminated by signal %d",
wpid, WTERMSIG(status));
/* DEBUG */
switch(WTERMSIG(status)) {
/*
* If a child dumped core,
* shut everything down.
*/
case SIGQUIT:
case SIGILL:
case SIGTRAP: /* ??? */
case SIGABRT:
#if defined(SIGEMT)
case SIGEMT: /* ??? */
#endif
case SIGFPE: /* ??? */
case SIGBUS:
case SIGSEGV:
#if defined(SIGSYS)
case SIGSYS: /* ??? */
#endif
#ifdef SIGXCPU
case SIGXCPU:
#endif
#ifdef SIGXFSZ
case SIGXFSZ:
#endif
act_pid = -1;
exit(1);
break;
}
}
else if(WIFEXITED(status))
{
if(WEXITSTATUS(status) != 0)
unotice("child %d exited with status %d",
wpid, WEXITSTATUS(status));
else
udebug("child %d exited with status %d",
wpid, WEXITSTATUS(status));
act_pid = -1;
exit(WEXITSTATUS(status));
}
#endif
}
return wpid;
}
void
cleanup(void)
{
unotice("Exiting");
if(act_pid != -1)
{
(void)signal(SIGCHLD, SIG_IGN);
kill(act_pid, SIGTERM);
(void) reap_act(0);
}
if(opq != NULL)
{
off_t highwater = 0;
size_t maxregions = 0;
(void) pq_highwater(opq, &highwater, &maxregions);
(void) pq_close(opq);
opq = NULL;
unotice(" Queue usage (bytes):%8ld",
(long)highwater);
unotice(" (nregions):%8ld",
(long)maxregions);
}
if(pq != NULL)
{
(void) pq_close(pq);
pq = NULL;
}
dump_stats();
(void) closeulog();
}
static void
signal_handler(int sig)
{
#ifdef SVR3SIGNALS
/*
* Some systems reset handler to SIG_DFL upon entry to handler.
* In that case, we reregister our handler.
*/
(void) signal(sig, signal_handler);
#endif
switch(sig) {
case SIGINT :
unotice("Interrupt");
intr = !0;
exit(0);
case SIGTERM :
udebug("SIGTERM");
(void) sleep(0);
done = !0;
return;
case SIGUSR1 :
udebug("SIGUSR1");
stats_req = !0;
return;
case SIGUSR2 :
udebug("SIGUSR2");
rollulogpri();
return;
case SIGCHLD :
(void) reap_act(WNOHANG);
/* usually calls exit */
return;
}
udebug("signal_handler: unhandled signal: %d", sig);
}
/*
* register the signal_handler
*/
static void
set_sigactions(void)
{
struct sigaction sigact;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
/* Ignore these */
sigact.sa_handler = SIG_IGN;
(void) sigaction(SIGHUP, &sigact, NULL);
(void) sigaction(SIGPIPE, &sigact, NULL);
(void) sigaction(SIGALRM, &sigact, NULL);
/* Handle these */
#ifdef SA_RESTART /* SVR4, 4.3+ BSD */
/* usually, restart system calls */
sigact.sa_flags |= SA_RESTART;
#endif
sigact.sa_handler = signal_handler;
(void) sigaction(SIGTERM, &sigact, NULL);
(void) sigaction(SIGUSR1, &sigact, NULL);
(void) sigaction(SIGUSR2, &sigact, NULL);
(void) sigaction(SIGCHLD, &sigact, NULL);
/* Don't restart after interrupt */
sigact.sa_flags = 0;
#ifdef SA_INTERRUPT /* SunOS 4.x */
sigact.sa_flags |= SA_INTERRUPT;
#endif
(void) sigaction(SIGINT, &sigact, NULL);
}
static int
expire(pqueue *epq, const unsigned interval, const double age)
{
int status = ENOERR;
static timestampt now;
static prod_class eclss;
static prod_spec spec;
timestampt ts;
timestampt cursor;
double diff = 0.;
double max_latency = 0.;
size_t nr;
if(eclss.psa.psa_val == 0)
{
/* first time */
eclss.from = TS_ZERO;
eclss.psa.psa_len = 1;
eclss.psa.psa_val = &spec;
spec.feedtype = ANY;
spec.pattern = ".*";
regcomp(&spec.rgx, spec.pattern, REG_EXTENDED|REG_NOSUB);
}
(void) set_timestamp(&now);
if(d_diff_timestamp(&now, &eclss.to) < interval + age)
{
/* only run this routine every interval seconds */
udebug("not yet");
return ENOERR;
}
/* else */
eclss.to = now;
eclss.to.tv_sec -= age;
if(ulogIsDebug())
{
char cp[64];
sprint_timestampt(cp, sizeof(cp), &eclss.to);
udebug("to %s", cp);
}
pq_cset(epq, &TS_ZERO);
while(!done && !stats_req)
{
nr = 0;
status = pq_seqdel(epq, TV_GT, &eclss, 0, &nr, &ts);
switch(status) {
case ENOERR:
pq_ctimestamp(epq, &cursor);
diff = d_diff_timestamp(&cursor, &ts);
if(diff > max_latency)
{
max_latency = diff;
udebug("max_latency %.3f", max_latency);
}
if(nr == 0)
{
diff = d_diff_timestamp(&cursor, &eclss.to);
udebug("diff %.3f", diff);
if(diff > interval + max_latency)
{
udebug("heuristic depth break");
break;
}
}
continue; /* N.B., other cases break and return */
case PQUEUE_END:
udebug("expire: End of Queue");
break;
case EAGAIN:
case EACCES:
udebug("Hit a lock");
break;
#if defined(EDEADLOCK) && EDEADLOCK != EDEADLK
case EDEADLOCK:
#endif
case EDEADLK:
uerror("%s", strerror(status));
break;
default:
uerror("pq_seqdel failed: %s (errno = %d)",
strerror(status), status);
break;
}
break;
}
return status;
}
main(int ac, char *av[])
{
const char *progname = ubasename(av[0]);
char *logfname;
prod_class clss;
prod_spec spec;
int status = 0;
unsigned interval = DEFAULT_INTERVAL;
int logoptions = (LOG_CONS|LOG_PID);
double age = DEFAULT_AGE;
/* these are containers for the pqact args */
char *argv[16];
int argc = 0;
int toffset = TOFFSET_NONE;
logfname = "";
if(set_timestamp(&clss.from) != ENOERR) /* corrected by toffset below */
{
int errnum = errno;
fprintf(stderr, "Couldn't set timestamp: %s",
strerror(errnum));
exit(1);
}
clss.to = TS_ENDT;
clss.psa.psa_len = 1;
clss.psa.psa_val = &spec;
spec.feedtype = DEFAULT_FEEDTYPE;
spec.pattern = DEFAULT_PATTERN;
memset(argv, 0, sizeof(argv));
argv[0] = "pqact";
argc++;
/*
* Check the environment for some options.
* May be overridden by command line switches below.
*/
{
const char *ldmpqfname = getenv("LDMPQFNAME");
if(ldmpqfname != NULL)
pqfname = ldmpqfname;
}
{
extern int optind;
extern int opterr;
extern char *optarg;
int ch;
int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE));
int fterr;
char *conffilename = DEFAULT_SURF_CONFFILE;
char *datadir = DEFAULT_SURF_DATADIR;
usePil = 1;
opterr = 1;
while ((ch = getopt(ac, av, "vxl:d:f:p:q:Q:o:i:a:t:")) != EOF)
switch (ch) {
case 'v':
argv[argc++] = "-v";
logmask |= LOG_MASK(LOG_INFO);
break;
case 'x':
argv[argc++] = "-x";
logmask |= LOG_MASK(LOG_DEBUG);
break;
case 'l':
argv[argc++] = "-l";
argv[argc++] = optarg;
logfname = optarg;
break;
case 'd':
datadir = optarg;
break;
case 'f':
fterr = strfeedtypet(optarg, &spec.feedtype);
if(fterr != FEEDTYPE_OK)
{
fprintf(stderr, "%s: %s: \"%s\"\n",
av[0], strfeederr(fterr), optarg);
usage(progname);
}
argv[argc++] = "-f";
argv[argc++] = optarg;
break;
case 'p':
spec.pattern = optarg;
/* compiled below */
break;
case 'q':
pqfname = optarg;
break;
case 'Q':
opqfname = optarg;
break;
case 'o':
toffset = atoi(optarg);
if(toffset == 0 && *optarg != '0')
{
fprintf(stderr, "%s: invalid offset %s\n",
av[0], optarg);
usage(av[0]);
}
argv[argc++] = "-o";
argv[argc++] = optarg;
break;
case 'i':
interval = atoi(optarg);
if(interval == 0 && *optarg != '0')
{
fprintf(stderr, "%s: invalid interval \"%s\"\n",
av[0], optarg);
usage(av[0]);
}
/* N.B. -i just used for input queue. */
break;
case 'a':
age = atof(optarg);
if(age < 0.)
{
(void) fprintf(stderr,
"age (%s) must be non negative\n",
optarg);
usage(av[0]);
}
break;
case 't':
/* pipe_timeo */
argv[argc++] = "-t";
argv[argc++] = optarg;
break;
case '?':
usage(progname);
break;
}
(void) setulogmask(logmask);
status = regcomp(&spec.rgx,
spec.pattern,
REG_EXTENDED|REG_NOSUB);
if(status != 0)
{
fprintf(stderr, "Bad regular expression \"%s\"\n",
spec.pattern);
usage(av[0]);
}
if(ac - optind == 1)
conffilename = av[optind];
argv[argc++] = "-d";
argv[argc++] = datadir;
argv[argc++] = "-q";
argv[argc++] = opqfname;
argv[argc++] = conffilename;
age *= 3600.;
}
if(toffset != TOFFSET_NONE)
{
clss.from.tv_sec -= toffset;
}
else
{
clss.from.tv_sec -= (age - interval);
}
/*
* Set up error logging.
* N.B. log ident is the remote
*/
(void) openulog(progname,
logoptions, LOG_LDM, logfname);
unotice("Starting Up (%d)", getpgrp());
/*
* register exit handler
*/
if(atexit(cleanup) != 0)
{
serror("atexit");
exit(1);
}
/*
* set up signal handlers
*/
set_sigactions();
/*
* Open the Output product que
*/
status = pq_open(opqfname, PQ_DEFAULT, &opq);
if(status)
{
uerror("pq_open failed: %s: %s\n",
opqfname, strerror(status));
exit(1);
}
act_pid = run_child(argc, argv);
if(act_pid == (pid_t)-1)
exit(1);
/*
* Open the product que
*/
status = pq_open(pqfname, PQ_READONLY, &pq);
if(status)
{
uerror("pq_open failed: %s: %s\n",
pqfname, strerror(status));
exit(1);
}
if(toffset == TOFFSET_NONE)
{
/* Jump to the end of the queue */
timestampt sav;
sav = clss.from;
clss.from = TS_ZERO;
(void) pq_last(pq, &clss, NULL);
clss.from = sav;
}
else
{
pq_cset(pq, &clss.from);
}
if(ulogIsVerbose())
{
char buf[1984];
uinfo("%s",
s_prod_class(buf, sizeof(buf), &clss));
}
while(!done)
{
if(stats_req)
{
dump_stats();
stats_req = 0;
}
status = pq_sequence(pq, TV_GT, &clss, split_prod, NULL);
switch(status) {
case 0: /* no error */
continue; /* N.B., other cases sleep */
case PQUEUE_END:
udebug("surf: End of Queue");
break;
case EAGAIN:
case EACCES:
udebug("Hit a lock");
break;
default:
uerror("pq_sequence failed: %s (errno = %d)",
strerror(status), status);
exit(1);
break;
}
if(interval == 0)
{
break;
}
(void) expire(opq, interval, age);
pq_suspend(interval);
}
/*
* TODO: how can we determine that pqact has finished
* the work in opq?
*/
sleep(5);
exit(0);
}