This archive contains answers to questions sent to Unidata support through mid-2025. Note that the archive is no longer being updated. We provide the archive for reference; many of the answers presented here remain technically correct, even if somewhat outdated. For the most up-to-date information on the use of NSF Unidata software and data services, please consult the Software Documentation first.
Greg, I didn't sent the lastest code, here's the code that will work. Use these files in the src/pqsurf directory. Robb... Output: zero.unidata.ucar.edu.rkambic> m 00030617.wmo METAR^M KHOX 1720Z 061719Z AUTO 20012KT 16/00 A2982 RMK A01 T0161 PCPN 000=^M METAR^M KCBK 1729Z 061726Z AUTO 18016G26KT 16/M01 A2979 RMK A01 T0161 PCPN 000=^M METAR^M KCBK 1744Z 061743Z AUTO 20017G24KT 17/01 A2979 RMK A01 T0167 PCPN 000=^M METAR^M KHOX 1750Z 061749Z AUTO 18012KT 18/M02 A2983 RMK A01 T0178 PCPN 000=^M METAR^M KTRB 1756Z 061752Z 22016KT 18/M02 RMK AO1 T01831017=^M METAR^M KYMA 1757Z 061753Z 23008KT 14/M06 RMK AO1 T01441061=^M METAR^M KGNL 1758Z 061758Z AUTO 27008KT 18/01 A2988 RMK A01 T01800010 PCPN000=^M METAR^M KNRN 1758Z 061758Z AUTO 16010KT 17/00 A2986 RMK A01 T017000000 PCPN000=^M METAR^M KSTR 1758Z 061758Z AUTO 14012KT 16/M01 A2978 RMK A01 T01601010 PCPN000=^M METAR^M KCBK 1759Z 061758Z AUTO 20019KT 17/01 A2980 RMK A01 T0172 PCPN 000=^M On Fri, 3 Mar 2000, Greg Thompson wrote: > > Robb, > > thanks so much for helping with the FXUS43 issue. I compiled your latest > code into our version of pqsurf but I'm still not getting any data with > those headers into my "metar_gdb" files. Here's what I'm using in the > ldmd.conf file: > > exec "pqsurf -l /data/ldm/logs/pqsurf.log -f WMO -p ^(S[AP]|FXUS43) -Q > /data/ldm... > > and here's the important line from pqsurf.conf: > > WMO ^metar (....) ([0-3][0-9])([0-2][0-9]) DBFILE /data/ldm/ddp........ > > Also, there are new METARs coming out of Cheyenne, WY and look like: > > 344 ^M^M > SAUS45 KCYS 032002^M^M > MTRCTD^M^M > METAR KCTD 031945Z AUTO 23010G13KT 07/M06 RMK AO1 RH% = 42 ^M^M > ^M^M > ^M^M > ^M^M > ^C > > Even though these clearly have "SA" as their starting header, they are > also not appearing in my "metar_gdb" files. I'd very much like to get > these into our system appropriately. Can you help? > > Thanks! > > -- > +--------------------------------------------------------------+ > | Greg Thompson http://www.rap.ucar.edu/staff/gthompsn/ | > | Research Applications Program | > | (303) 497-2805 National Center for Atmospheric Research | > | (fax) -8401 P.O. Box 3000 Boulder, CO 80307-3000 | > +--------------------------------------------------------------+ > =============================================================================== Robb Kambic Unidata Program Center Software Engineer III Univ. Corp for Atmospheric Research address@hidden WWW: http://www.unidata.ucar.edu/ ===============================================================================
/* * Copyright 1993, University Corporation for Atmospheric Research * See ../COPYRIGHT file for copying and redistribution conditions. */ /* $Id: wmo_header.h,v 1.11 1999/06/03 21:13:26 rkambic Exp $ */ #ifndef _WMO_HEADER_H_ #define _WMO_HEADER_H_ #include "xbuf.h" #include "dtime.h" typedef enum { MESSAGE_TYPE_UNKNOWN = 0 , SYNOP, /* FM 12 */ SHIP, /* FM 13 */ METAR, /* FM 15 or SAO!! */ SPECI /* FM 16 */ } message_type_t; typedef enum { ORIGINAL = 0, RTD , /* delayed */ COR , /* correction */ AMD , /* amended */ PIE /* pieces */ } retransmit_t; /* abbreviated heading, 2.3.2 */ typedef struct { char TT[3]; /* Tsub1Tsub2 : Data type and/or form */ char AA[3]; /* Asub1Asub2 : Geograph. and/or time */ int ii; char CCCC[5]; /* station of origin or compilation */ char PIL[10]; dtime *time; retransmit_t retransmit; /* BBB delay, correction or amendment ind */ int retrans_seq; /* the sequence from BBB */ } wmo_header_t; extern void free_wmo_header(wmo_header_t *hdr); extern wmo_header_t *new_wmo_header(char **line); extern wmo_header_t *get_wmo_header(xbuf *buf, wmo_header_t *hdr); extern char * s_wmo_header(wmo_header_t *hdr); /* extern int fprint_wmo_header(FILE *fp , const wmo_header_t *hdr); */ extern char *sRetransmit(wmo_header_t *hdr ); extern char *sMessage_type(message_type_t type); extern message_type_t decode_type(char *tt, char *aa, char *PIL); #endif /* _WMO_HEADER_H_ */
/* * Copyright 1993, University Corporation for Atmospheric Research * See ../COPYRIGHT file for copying and redistribution conditions. */ /* $Id: wmo_header.c,v 1.30 1999/06/21 22:15:21 rkambic Exp $ */ #include <stdio.h> #include <string.h> #include "wmo_header.h" #include "alloc.h" #include "tokens.h" #include "xbuf.h" int usePil=0; void free_wmo_header(wmo_header_t *hdr) { if(hdr == NULL) return; free_dtime(hdr->time); free(hdr); } static void clear_wmo_header(wmo_header_t *hdr) { hdr->TT[0] = hdr->AA[0] = hdr->ii = hdr->CCCC[0] = hdr->PIL[0] = 0; hdr->retransmit = ORIGINAL; hdr->retrans_seq = MESSAGE_TYPE_UNKNOWN; clear_dtime(hdr->time); } wmo_header_t * get_wmo_header(xbuf *buf, wmo_header_t *hdr) { clear_wmo_header(hdr); if( get_wstr(buf, hdr->TT, 2) == EOB ) return NULL; if( get_wstr(buf, hdr->AA, 2) == EOB ) return NULL; if( get_wnum(buf, &hdr->ii, 2) == EOB ) return NULL; if(hdr->ii == -1) hdr->ii = 0; if( get_wstr(buf, hdr->CCCC, 4) == EOB ) return NULL; { int YY, GG, gg; if( get_wnum(buf, &YY, 2) == EOB ) return NULL; if( get_wnum(buf, &GG, 2) == EOB ) return NULL; if( get_wnum(buf, &gg, 2) == EOB ) return NULL; /* uses current time on parse errors */ hdr->time = set_dtime(hdr->time, YY,GG,gg); } /* decode BBB feild */ { /* inline */ char line[16]; char pilstr[16]; int allnum, nonalph, ich; if( get_wline(buf, line, sizeof(line)) == EOB ) return NULL; { /* inline inner */ const char *cp = &line[0]; const char *const end = &line[strlen(line)]; /* N.B. twisted flow here */ for(cp = &line[0]; *cp != 0 && *cp != CR && cp + 2 < end; cp++) { switch( cp[0] ) { case 'R' : if(cp[1] == 'R') { hdr->retransmit = RTD; hdr->retrans_seq = cp[2]; } else if(cp[1] == 'T') { hdr->retransmit = RTD; } break; case 'C' : if(cp[1] == 'C') { hdr->retransmit = COR; hdr->retrans_seq = cp[2]; } else if(cp[1] == 'O') { hdr->retransmit = COR; } break; case 'A' : if(cp[1] == 'A') { hdr->retransmit = AMD; hdr->retrans_seq = cp[2]; } else if(cp[1] == 'M') { hdr->retransmit = AMD; } break; case 'P' : hdr->retransmit = PIE; hdr->retrans_seq = (cp[1] - 'A') * 26 + (cp[2] - 'A'); goto done; case 0 : break; default : continue; /* loop, look some more */ } /* only arrrive here if we got an acceptable BBB string */ if(hdr->retrans_seq == 0) { int tmp; /* see if they used old style, eg RTD01 */ cp += 3; tmp = atoi(cp); /* Attachment II-14 */ if(tmp >= 0 && tmp < 24 ) hdr->retrans_seq = tmp + 'A'; else hdr->retrans_seq = 'Y'; } done: break; } /* end for */ } /* end inline inner */ if((usePil == 1) && ( get_wstr(buf, pilstr, sizeof(pilstr)) != EOB )) { if((pilstr[0] != NULL) && ( get_wline(buf, line, sizeof(line)) != EOB )) { if((line[0] == NULL)&&(strlen(pilstr) > 4)&&(strlen(pilstr) < 7)) { nonalph = 0; allnum = 1; for(ich = 0;ich < strlen(pilstr);ich++) { if(isalnum(pilstr[ich]) == 0) nonalph = 1; if(isdigit(pilstr[ich]) == 0) allnum = 0; } if((nonalph == 0)&&(allnum == 0)) sprintf(hdr->PIL," /p%s\0",pilstr); } else if((line[0] == NULL)&&(strncmp(pilstr,"^NMC",4) == 0)&& (strlen(pilstr+4) > 4)&&(strlen(pilstr+4) < 7)) sprintf(hdr->PIL," /p%s\0",pilstr+4); } } } /* end inline */ return hdr; } char * sRetransmit(wmo_header_t *hdr) { static char buf[4]; int seq = hdr->retrans_seq; if(hdr->retransmit == PIE) { if(seq < 0 || seq > 675) /* 26 * 26 -1 */ seq = 675; } else if(seq < 'A' || seq > 'Z') seq = 0; switch ( hdr->retransmit ) { case RTD : if(seq) { sprintf(buf,"RR%c", seq); return buf; } /* else */ return "RTD"; case COR : if(seq) { sprintf(buf,"CC%c", seq); return buf; } /* else */ return "COR"; case AMD : if(seq) { sprintf(buf,"AA%c", seq); return buf; } /* else */ return "AMD"; case PIE : { const char c1 = seq/26 + 'A'; const char c2 = seq%26 + 'A'; sprintf(buf,"P%c%c", c1, c2); return buf; } } /* default */ return NULL; } char * s_wmo_header(wmo_header_t *hdr) { #ifndef KEYSIZE #define KEYSIZE 255 #endif static char sp[KEYSIZE+1]; char *cp = sp; (void) memset(sp,0,sizeof(sp)); sprintf(cp, "%2s%2s%02d %4s", hdr->TT, hdr->AA, hdr->ii, hdr->CCCC); cp += 11; if(hdr->time != NULL) { sprintf(cp, " %02d%02d%02d", hdr->time->mday, hdr->time->hour, hdr->time->min ); } else { sprintf(cp, " DDHHMM"); } cp += 7; if(hdr->retransmit != ORIGINAL) sprintf(cp, " %s", sRetransmit(hdr)); if(hdr->PIL[0] != NULL) strcat(sp,hdr->PIL); return sp; } #if 0 int fprint_wmo_header(FILE *fp, const wmo_header_t *hdr) { #if 1 fprintf(fp, "%2s%2s%02d %4s", hdr->TT, hdr->AA, hdr->ii, hdr->CCCC); if(hdr->time != NULL) { fprintf(fp, " %02d%02d%02d", hdr->time->mday, hdr->time->hour, hdr->time->min ); } else { fprintf(fp, " DDHHMM"); } if(hdr->retransmit != ORIGINAL) fprintf(fp, " %s", sRetransmit(hdr)); #else fputs( s_wmo_header(hdr) , fp ); #endif return ferror(fp); } #endif #if USED static void clear_wmo_header(wmo_header_t *hdr) { hdr->TT[0] = hdr->AA[0] = hdr->ii = hdr->CCCC[0] = hdr->retransmit = hdr->retrans_seq = 0; free_dtime(hdr->time); hdr->time = NULL; } #endif /* USED */ char * sMessage_type(message_type_t type) { switch(type) { case SYNOP : return "SYNOP"; case SHIP : return "SHIP"; case METAR : return "METAR"; case SPECI : return "SPECI"; case MESSAGE_TYPE_UNKNOWN : return "MESSAGE_TYPE_UNKNOWN"; } /* default */ return NULL; } message_type_t decode_type(char *tt, char *aa, char *pil) { if( pil[0] == '/' && pil[1] == 'p' && pil[2] == 'M' && pil[3] == 'T') { if( pil[4] == 'R' || pil[4] == 'T') { uerror("HDR + PIL: %s%s %s", tt, aa, pil ) ; return METAR; } } if( tt[0] == 'S' || tt[0] == 'F') { /* Table B1 */ switch( tt[1] ) { case 'I' : case 'M' : case 'N' : if(aa[0] == 'W' || aa[0] == 'V' ) { /* table C2 */ switch( aa[1] ) { case 'A' : case 'B' : case 'C' : case 'D' : case 'E' : case 'F' : case 'J' : case 'X' : return SHIP; } /* else */ } /* else */ return SYNOP; case 'A' : return METAR; /* might be an 'sao' */ case 'X' : if(*aa == 'U' && *(aa +1) == 'S') return METAR; /* 'sao' */ break; case 'P' : return SPECI; } /* else */ } /* else */ return MESSAGE_TYPE_UNKNOWN; }
/* * Copyright 1993, University Corporation for Atmospheric Research * See ../COPYRIGHT file for copying and redistribution conditions. */ /* $Id: surf_split.c,v 1.30 1999/12/02 23:21:26 rkambic Exp $ */ #include <ldmconfig.h> #include <stdio.h> #include <ctype.h> #include <string.h> #include "ldm.h" #include "ulog.h" #include "wmo_header.h" #include "tokens.h" #include "xbuf.h" #include "surface.h" /* wind_units_t, CALL_SIGN_LEN */ #include "md5.h" static double md5ctx[16]; /* 88 would be big enough */ static MD5_CTX *md5ctxp = (MD5_CTX *)md5ctx; static int get_yygg(xbuf *buf, dtime *time) { int status; int YY = -1; int GG = -1; if((status = dget_wnum(buf, &YY, 2)) < 0) return status; if((status = dget_num(buf, &GG, 2)) < 0) return status; set_dtime(time, YY, GG, 0); return status; } /* For METAR, check if the HHMMZ time string is present */ static int whas_yyggZ(xbuf *buf) { int ch; /* skip white space */ do{ ch = nextc(buf); }while((isascii(ch) && !isgraph(ch))); unnextc(buf,ch); if(buf->cnt < 5) return 0; /* not enough characters */ if(buf->get[4] != 'Z' || !isdigit(buf->get[3]) || !isdigit(buf->get[2]) || !isdigit(buf->get[1]) || !isdigit(buf->get[0])) return 0; return 1; /* passed */ } /* For METAR, check if "NIL" */ static int has_NIL(xbuf *buf) { char nilstr[] = "NIL"; char *np = (char *)&buf->base[buf->bufsiz - 1 - (sizeof(nilstr) -1 -1)]; if(strncmp(np, nilstr, sizeof(nilstr) -1) == 0) return 1; return 0; } /* For METAR, get the bulletin time, if possible */ static void get_wyyggZ(xbuf *buf, dtime *time) { int ch; if(!whas_yyggZ(buf)) return; (void)get_yygg(buf, time); ch = nextc(buf); /* eat the 'Z' */ return; } /* * Takes a WMO format product which is a * SAO, SYNOP, SHIP, METAR, or SPECI message, splits it into * individual observations. The observations are each encapsulated in a * new product which inherits most of its description from the * original product. * The new product pkey is derived from the observation type * and has the following form: * * SAO - "sao tt ccc ddhhmm" * where: * tt is SA, SP or RS * ccc is the station ID like SFO, LXV, etc * ddhhmm is the time stamp. * * SYNOP - "aaxx nnnnn ddhhmm" * where: * nnnnn is the WMO station id (5 digit number) * * SHIP - "bbxx c* ddhhmm" * where: * c* is the call sign * * METAR - "metar cccc ddhhmm" * where: * cccc is the call sign * * SPECI - "speci cccc ddhhmm" * * The new product sequence number is original sequence number times 1000 * plus the sequence of the individual observation within the product. * * 'doit' is called on each of the new products. It is presumed * this function return zero upon success. * * Returns the number of successful calls to 'doit', eg, the * number of splits. Returns -1 on error. */ int surf_split(const prod_info *infop, const void *datap, int (*doit)(const prod_info *, const void *)) { int action = -1; wmo_header_t hdr; message_type_t mtype; dtime dt; xbuf buf[1]; unsigned char dbuf[8192]; /* TODO */ int nsplit = 0; enum { SURFACE_BOGUS , AAXX, US_AAXX, BBXX, SAO, sMETAR, sSPECI } subtype = SURFACE_BOGUS; hdr.time = &dt; if(infop->sz > sizeof(dbuf)) return -1; /* TODO: too big */ memcpy(dbuf, datap, infop->sz); if( cbuftoxbuf(buf, (unsigned char *)dbuf, infop->sz) == NULL) return -1; skipline(buf, 4); /* SOH */ skipline(buf, 12); /* start */ if( get_wmo_header(buf, &hdr) == NULL) { return -1; } #if DEBUG fputs("\t", stderr); fprint_wmo_header(stderr, &hdr); fputs("\n", stderr); #endif mtype = decode_type(hdr.TT,hdr.AA,hdr.PIL); /* #### */ { char cbuf[8]; int digit; dtime time; wind_units_t wind_units = WIND_UNAVAIL; time = *hdr.time; /* default the ob time to the time in the header */ /* delve into section 0 */ switch(mtype) { case SYNOP : if(get_wstr(buf, cbuf, 1) < 0 ) return -1; if(cbuf[0] == 'A') { subtype = AAXX; if(get_str(buf, &cbuf[1], 3) < 0 ) return -1; if( cbuf[3] != 'X' ) { /* punt */ uerror("surface_split: Unknown type: %s\n", cbuf); return 0; } if(get_yygg(buf, &time) < 0 ) return -1; /* YYGG */ if(dget_num(buf, &digit, 1) < 0 ) return -1; /* isubw */ if(digit >= 0 && digit <= 4) wind_units = (wind_units_t)digit; } else if(isascii(cbuf[0]) && isdigit(cbuf[0])) /* US Stations 7NNNN */ { unnextc(buf,cbuf[0]); subtype = US_AAXX; /* * Some US reports leave off AAXX YYGGisubw, so we use the * time from the wmo header. */ wind_units = KNOTS; } else { unnextc(buf,cbuf[0]); return 0; /* ?? */ } break; case SHIP : if(get_wstr(buf, cbuf, 4) < 0 ) return -1; if(cbuf[0] == 'B') { if( cbuf[3] != 'X' ) { /* punt */ uerror("surface_split: Unknown type: %s\n", cbuf); return 0; } subtype = BBXX; /* get time below */ } else { unnextc(buf,cbuf[0]); return 0; } break; case METAR : if(whasSTR(buf, "METAR")) { subtype = sMETAR; get_wyyggZ(buf, &time); } else if(hdr.PIL[0] == 'M' && hdr.PIL[1] == 'T') { uerror("HDR + PIL: %s%s %s", hdr.TT, hdr.AA, hdr.PIL ) ; /* skip 6 char PIL */ if(get_wstr(buf, cbuf, CALL_SIGN_LEN) < 0) return 0; subtype = sMETAR; get_wyyggZ(buf, &time); } else subtype = SAO; /* may actually be a METAR, check below */ break; case SPECI : if(whasSTR(buf, "SPECI")) { subtype = sSPECI; get_wyyggZ(buf, &time); } break; default : uerror("surface_split: Can't handle %s", sMessage_type(mtype) ); uerror("HDR + PIL: %s%s %s", hdr.TT, hdr.AA, hdr.PIL ) ; return -1; } { /* while block */ static char newkey[KEYSIZE]; xbuf subbuf[1]; prod_info newinfo = *infop; #define MAX_SURF_LEN 511 #undef MIN #define MIN(a,b) ((a) <= (b) ? (a) : (b)) char pbuf[MAX_SURF_LEN + 1]; int l1, l2; static char ident[CALL_SIGN_LEN+1]; static char type[4]; u_int subseq = infop->seqno * 1000; unsigned char *pp; while( get_weqxbuf(buf, subbuf) > 0 ) { (void)memset(newkey,0,KEYSIZE); (void)memset(pbuf,0,MAX_SURF_LEN + 1); (void)memset(ident,0,CALL_SIGN_LEN+1); pp = subbuf->base; switch(subtype) { case AAXX : case US_AAXX : strcpy(newkey, "aaxx "); strcpy(pbuf, "AAXX"); sprintf(&pbuf[strlen(pbuf)], " %02d%02d%1d\r\r\n", time.mday, time.hour, (int)wind_units); /* WMO station no. */ if(get_wstr(subbuf, ident, 5) < 0) continue; strcat(newkey, ident); break; case BBXX : strcpy(newkey, "bbxx "); strcpy(pbuf, "BBXX\r\r\n"); /* call sign */ if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0) continue; strcat(newkey, ident); if(get_yygg(subbuf, &time) < 0) continue; /* YYGG */ break; case sSPECI : /* call sign */ if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0) continue; if(strcmp(ident, "SPECI") == 0) { /* They package each ob with a tag */ pp = (subbuf->get +1); if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0) continue; } if(!whas_yyggZ(subbuf)) { /* Have to insert the date */ sprintf(pbuf, "SPECI\r\r\n%s %02d%02dZ ", ident, time.hour, time.min); pp = subbuf->get; } else strcpy(pbuf, "SPECI\r\r\n"); strcpy(newkey, "speci "); strcat(newkey, ident); break; case sMETAR : if(has_NIL(subbuf)) continue; /* call sign */ if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0) continue; if(strcmp(ident, "METAR") == 0) { /* They package each ob with a tag */ pp = (subbuf->get +1); if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0) continue; } if(!whas_yyggZ(subbuf)) { /* Have to insert the date */ sprintf(pbuf, "METAR\r\r\n%s %02d%02dZ ", ident, time.hour, time.min); pp = subbuf->get; } else strcpy(pbuf, "METAR\r\r\n"); strcpy(newkey, "metar "); strcat(newkey, ident); break; case SAO : /* call sign */ if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0) continue; if(hdr.AA[0] == 'U' && hdr.AA[1] == 'S' && strlen(ident) == 6) { /* skip 6 char US "AFOS code" */ if(get_wstr(subbuf, ident, CALL_SIGN_LEN) < 0) continue; } /* SA, SP, RS, USP or XP */ if(get_wstr(subbuf, type, 3) < 0) continue; if((type[0] == 'S' && (type[1] == 'A' || type[1] == 'P')) || (type[0] == 'R' && type[1] == 'S') || (type[0] == 'U' && type[1] == 'S' && type[2] == 'P') || (type[0] == 'X' && type[1] == 'P') || (type[0] == 'T' && (type[1] == 'A' || type[1] == 'S')) ) { strcpy(newkey, "sao "); strcat(newkey, type); strcat(newkey, " "); strcat(newkey, ident); } else if(isdigit(type[0]) && isdigit(type[1])) { /* it is a METAR really */ subtype = sMETAR; strcpy(newkey, "metar "); strcat(newkey, ident); strcpy(pbuf, "METAR\r\r\n"); } else continue; /* don't know what it is, "NIL=" */ break; } /* safety net */ if(strlen(ident) == 0) { continue; } /* else */ sprintf(&newkey[strlen(newkey)], " %02d%02d%02d", time.mday, time.hour, time.min); if(hdr.retransmit != ORIGINAL) sprintf(&newkey[strlen(newkey)], " %s", sRetransmit(&hdr)); newinfo.ident = newkey; newinfo.seqno = ++subseq; l1 = strlen(pbuf); l2 = MIN(MAX_SURF_LEN - l1 - 4, subbuf->bufsiz - (pp - subbuf->base)); /* N.B.: silent truncation */ strncat(pbuf, (char *)pp, l2 ); strcat(pbuf,"=\r\r\n"); newinfo.sz = l1 + l2 + 4; #if DEBUG fprintf(stderr,"\t\t%s\n", newinfo.ident); #endif #if PRINT { char *cp = pbuf; char *end = &cp[newinfo.sz]; while(cp < end) { putc(*cp, stderr); cp++; } } putc('\n', stderr); #endif MD5Init(md5ctxp); MD5Update(md5ctxp, (const unsigned char *)pbuf, newinfo.sz); MD5Final(newinfo.signature, md5ctxp); /* * process the single ob in the requested fashion */ if((*doit)(&newinfo, pbuf) == 0) nsplit++; } /* end while */ #if PRINT putc('\n', stderr); #endif } /* end while block */ } /* end #### block */ return nsplit; }
/* * Copyright 1993, University Corporation for Atmospheric Research * See ../COPYRIGHT file for copying and redistribution conditions. */ /* $Id: pqsurf.c,v 1.51 1999/04/02 23:16:35 davis Exp $ */ /* * */ /* * Need to create a queue before running this: * pqcreate -c -s 2M -S 13762 /usr/local/ldm/data/pqsurf.pq */ #include <ldmconfig.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <rpc/rpc.h> #include <signal.h> #include <unistd.h> #include <errno.h> #include <sys/wait.h> #include <assert.h> #include <regex.h> #include "ldm.h" #include "atofeedt.h" #include "ldmprint.h" #include "ulog.h" #include "pq.h" #include "paths.h" #include "surface.h" #ifdef NO_ATEXIT #include "atexit.h" #endif extern int usePil; /* 1/0 flag to signal use of AFOS like pil identifier */ #ifndef DEFAULT_INTERVAL #define DEFAULT_INTERVAL 15 #endif static volatile int done = 0; static volatile int intr = 0; static volatile int stats_req = 0; #ifndef DEFAULT_PATTERN #define DEFAULT_PATTERN "^S[AIMNP]" #endif #ifndef DEFAULT_FEEDTYPE #define DEFAULT_FEEDTYPE (IDS|DDS) #endif static const char *pqfname = DEFAULT_QUEUE; static pqueue *pq = NULL; /* set in paths.h by configure */ #ifndef DEFAULT_SURF_OUTQUEUE #define DEFAULT_SURF_OUTQUEUE "/usr/local/ldm/data/pqsurf.pq" #endif static char *opqfname = DEFAULT_SURF_OUTQUEUE; static pqueue *opq = NULL; /* set in paths.h by configure */ #ifndef DEFAULT_SURF_CONFFILE #define DEFAULT_SURF_CONFFILE "/usr/local/ldm/etc/pqsurf.conf" #endif /* set in paths.h by configure */ #ifndef DEFAULT_SURF_DATADIR #ifndef DEFAULT_DATADIR #define DEFAULT_SURF_DATADIR "/usr/local/ldm" #else #define DEFAULT_SURF_DATADIR DEFAULT_DATADIR #endif #endif #ifndef DEFAULT_PIPE_TIMEO #define DEFAULT_PIPE_TIMEO 60 #endif #ifndef DEFAULT_AGE #define DEFAULT_AGE (1. + (double)(DEFAULT_INTERVAL)/3600.) #endif static pid_t act_pid; /* * Set to non-root privilege if possible. * Do it in such a way that it is safe to fork. * TODO: this is duplicated from ../server/priv.c */ static void endpriv(void) { const uid_t euid = geteuid(); const uid_t uid = getuid(); /* if either euid or uid is unprivileged, use it */ if(euid > 0) setuid(euid); else if(uid > 0) setuid(uid); /* else warn??? or set to nobody??? */ } static pid_t run_child(int argc, char *argv[]) { pid_t pid; if(ulogIsDebug()) { char command[1024]; char *cp = command; int ii = 0; command[0] = 0; while (ii < argc) { strcpy(cp, argv[ii]); cp += strlen(argv[ii]); if(++ii == argc) break; *cp++ = ' '; *cp = 0; } udebug("exec'ing: \"%s\"", command); } pid = fork(); if(pid == -1) { serror("run_child: fork failed"); return pid; } if(pid == 0) { /* child */ (void)signal(SIGCHLD, SIG_DFL); (void)signal(SIGTERM, SIG_DFL); /* keep same descriptors as parent */ /* don't let child get real privilege */ endpriv(); (void) execvp(argv[0], &argv[0]); serror("run_child: execvp: %s", argv[0]); _exit(127); } /* else, parent */ return pid; } static int nprods = 0; static int nsplit = 0; static int ndups = 0; static void dump_stats(void) { unotice("Number of products %d", nprods); unotice("Number of observations %d", nsplit); unotice("Number of dups %d", ndups); } /* defined in surf_split.c */ extern int surf_split(const prod_info *infop, const void *datap, int (*doit)(const prod_info *, const void *)); static int doOne(const prod_info *infop, const void *datap) { struct product prod; int status = ENOERR; if(ulogIsDebug()) udebug("%s", s_prod_info(NULL, 0, infop, 1)); prod.info = *infop; prod.data = (void *)datap; /* cast away const */ nsplit++; /* ?? Do it here on only on success ?? */ status = pq_insertNoSig(opq, &prod); if(status == ENOERR) { return status; /* Normal return */ } /* else */ if(status == PQUEUE_DUP) { ndups++; if(ulogIsVerbose()) uinfo("Product already in queue: %s", s_prod_info(NULL, 0, &prod.info, ulogIsDebug())); return status; } /* else, error */ uerror("pq_insert: %s\n", strerror(status)); return status; } /* */ static int split_prod(const prod_info *infop, const void *datap, void *xprod, size_t size, void *vp) { size_t *nsp = (size_t *)vp; int ns; if(ulogIsVerbose()) uinfo("%s", s_prod_info(NULL, 0, infop, ulogIsDebug())); ns = surf_split(infop, datap, doOne); nprods++; (void)kill(SIGCONT, act_pid); if(nsp != NULL && ns >= 0) *nsp = (size_t)ns; return 0; } static void usage(const char *av0) /* id string */ { (void)fprintf(stderr, "Usage: %s [options] [confilename]\t\nOptions:\n", av0); (void)fprintf(stderr, "\t-v Verbose, log each match (SIGUSR2 toggles)\n"); (void)fprintf(stderr, "\t-x Debug mode\n"); (void)fprintf(stderr, "\t-l logfile Send log info to file (default uses syslogd)\n"); (void)fprintf(stderr, "\t-d datadir cd to \"datadir\" before interpreting filenames in\n"); (void)fprintf(stderr, "\t conffile (default %s)\n", DEFAULT_SURF_DATADIR); (void)fprintf(stderr, "\t-q queue default \"%s\"\n", DEFAULT_QUEUE); (void)fprintf(stderr, "\t-p pattern Interested in products matching \"pattern\" (default \"%s\")\n", DEFAULT_PATTERN); (void)fprintf(stderr, "\t-f feedtype Interested in products from feed \"feedtype\" (default %s)\n", s_feedtypet(DEFAULT_FEEDTYPE)); (void)fprintf(stderr, "\t-i interval loop, polling each \"interval\" seconds (default %d)\n", DEFAULT_INTERVAL); (void)fprintf(stderr, "\t-a age Expire products older than \"age\" hours (default %.4f)\n", DEFAULT_AGE); (void)fprintf(stderr, "\t-t timeo set write timeo for PIPE subprocs to \"timeo\" secs (default %d)\n", DEFAULT_PIPE_TIMEO); (void)fprintf(stderr, "\t-o offset the oldest product we will consider is \"offset\" secs before now (default: most recent in output queue)\n"); (void)fprintf(stderr, "\t-Q outQueue default \"%s\"\n", DEFAULT_SURF_OUTQUEUE); (void)fprintf(stderr, "\t(default conffilename is %s)\n", DEFAULT_SURF_CONFFILE); exit(1); } static pid_t reap_act(int options) { pid_t wpid = 0; int status = 0; #ifndef NO_WAITPID wpid = waitpid(act_pid, &status, options); #else if(options == 0) wpid = wait(&status); /* customize here for older systems, use wait3 or whatever */ #endif if(wpid == -1) { if(!(errno == ECHILD && act_pid == -1)) { /* Only complain if relevant */ serror("waitpid"); } return -1; } /* else */ if(wpid != 0) { /* tag_pid_entry(wpid); */ #ifndef NO_WAITPID if(WIFSTOPPED(status)) { unotice("child %d stopped by signal %d", wpid, WSTOPSIG(status)); } else if(WIFSIGNALED(status)) { unotice("child %d terminated by signal %d", wpid, WTERMSIG(status)); /* DEBUG */ switch(WTERMSIG(status)) { /* * If a child dumped core, * shut everything down. */ case SIGQUIT: case SIGILL: case SIGTRAP: /* ??? */ case SIGABRT: #if defined(SIGEMT) case SIGEMT: /* ??? */ #endif case SIGFPE: /* ??? */ case SIGBUS: case SIGSEGV: #if defined(SIGSYS) case SIGSYS: /* ??? */ #endif #ifdef SIGXCPU case SIGXCPU: #endif #ifdef SIGXFSZ case SIGXFSZ: #endif act_pid = -1; exit(1); break; } } else if(WIFEXITED(status)) { if(WEXITSTATUS(status) != 0) unotice("child %d exited with status %d", wpid, WEXITSTATUS(status)); else udebug("child %d exited with status %d", wpid, WEXITSTATUS(status)); act_pid = -1; exit(WEXITSTATUS(status)); } #endif } return wpid; } void cleanup(void) { unotice("Exiting"); if(act_pid != -1) { (void)signal(SIGCHLD, SIG_IGN); kill(act_pid, SIGTERM); (void) reap_act(0); } if(opq != NULL) { off_t highwater = 0; size_t maxregions = 0; (void) pq_highwater(opq, &highwater, &maxregions); (void) pq_close(opq); opq = NULL; unotice(" Queue usage (bytes):%8ld", (long)highwater); unotice(" (nregions):%8ld", (long)maxregions); } if(pq != NULL) { (void) pq_close(pq); pq = NULL; } dump_stats(); (void) closeulog(); } static void signal_handler(int sig) { #ifdef SVR3SIGNALS /* * Some systems reset handler to SIG_DFL upon entry to handler. * In that case, we reregister our handler. */ (void) signal(sig, signal_handler); #endif switch(sig) { case SIGINT : unotice("Interrupt"); intr = !0; exit(0); case SIGTERM : udebug("SIGTERM"); (void) sleep(0); done = !0; return; case SIGUSR1 : udebug("SIGUSR1"); stats_req = !0; return; case SIGUSR2 : udebug("SIGUSR2"); rollulogpri(); return; case SIGCHLD : (void) reap_act(WNOHANG); /* usually calls exit */ return; } udebug("signal_handler: unhandled signal: %d", sig); } /* * register the signal_handler */ static void set_sigactions(void) { struct sigaction sigact; sigemptyset(&sigact.sa_mask); sigact.sa_flags = 0; /* Ignore these */ sigact.sa_handler = SIG_IGN; (void) sigaction(SIGHUP, &sigact, NULL); (void) sigaction(SIGPIPE, &sigact, NULL); (void) sigaction(SIGALRM, &sigact, NULL); /* Handle these */ #ifdef SA_RESTART /* SVR4, 4.3+ BSD */ /* usually, restart system calls */ sigact.sa_flags |= SA_RESTART; #endif sigact.sa_handler = signal_handler; (void) sigaction(SIGTERM, &sigact, NULL); (void) sigaction(SIGUSR1, &sigact, NULL); (void) sigaction(SIGUSR2, &sigact, NULL); (void) sigaction(SIGCHLD, &sigact, NULL); /* Don't restart after interrupt */ sigact.sa_flags = 0; #ifdef SA_INTERRUPT /* SunOS 4.x */ sigact.sa_flags |= SA_INTERRUPT; #endif (void) sigaction(SIGINT, &sigact, NULL); } static int expire(pqueue *epq, const unsigned interval, const double age) { int status = ENOERR; static timestampt now; static prod_class eclss; static prod_spec spec; timestampt ts; timestampt cursor; double diff = 0.; double max_latency = 0.; size_t nr; if(eclss.psa.psa_val == 0) { /* first time */ eclss.from = TS_ZERO; eclss.psa.psa_len = 1; eclss.psa.psa_val = &spec; spec.feedtype = ANY; spec.pattern = ".*"; regcomp(&spec.rgx, spec.pattern, REG_EXTENDED|REG_NOSUB); } (void) set_timestamp(&now); if(d_diff_timestamp(&now, &eclss.to) < interval + age) { /* only run this routine every interval seconds */ udebug("not yet"); return ENOERR; } /* else */ eclss.to = now; eclss.to.tv_sec -= age; if(ulogIsDebug()) { char cp[64]; sprint_timestampt(cp, sizeof(cp), &eclss.to); udebug("to %s", cp); } pq_cset(epq, &TS_ZERO); while(!done && !stats_req) { nr = 0; status = pq_seqdel(epq, TV_GT, &eclss, 0, &nr, &ts); switch(status) { case ENOERR: pq_ctimestamp(epq, &cursor); diff = d_diff_timestamp(&cursor, &ts); if(diff > max_latency) { max_latency = diff; udebug("max_latency %.3f", max_latency); } if(nr == 0) { diff = d_diff_timestamp(&cursor, &eclss.to); udebug("diff %.3f", diff); if(diff > interval + max_latency) { udebug("heuristic depth break"); break; } } continue; /* N.B., other cases break and return */ case PQUEUE_END: udebug("expire: End of Queue"); break; case EAGAIN: case EACCES: udebug("Hit a lock"); break; #if defined(EDEADLOCK) && EDEADLOCK != EDEADLK case EDEADLOCK: #endif case EDEADLK: uerror("%s", strerror(status)); break; default: uerror("pq_seqdel failed: %s (errno = %d)", strerror(status), status); break; } break; } return status; } main(int ac, char *av[]) { const char *progname = ubasename(av[0]); char *logfname; prod_class clss; prod_spec spec; int status = 0; unsigned interval = DEFAULT_INTERVAL; int logoptions = (LOG_CONS|LOG_PID); double age = DEFAULT_AGE; /* these are containers for the pqact args */ char *argv[16]; int argc = 0; int toffset = TOFFSET_NONE; logfname = ""; if(set_timestamp(&clss.from) != ENOERR) /* corrected by toffset below */ { int errnum = errno; fprintf(stderr, "Couldn't set timestamp: %s", strerror(errnum)); exit(1); } clss.to = TS_ENDT; clss.psa.psa_len = 1; clss.psa.psa_val = &spec; spec.feedtype = DEFAULT_FEEDTYPE; spec.pattern = DEFAULT_PATTERN; memset(argv, 0, sizeof(argv)); argv[0] = "pqact"; argc++; /* * Check the environment for some options. * May be overridden by command line switches below. */ { const char *ldmpqfname = getenv("LDMPQFNAME"); if(ldmpqfname != NULL) pqfname = ldmpqfname; } { extern int optind; extern int opterr; extern char *optarg; int ch; int logmask = (LOG_MASK(LOG_ERR) | LOG_MASK(LOG_NOTICE)); int fterr; char *conffilename = DEFAULT_SURF_CONFFILE; char *datadir = DEFAULT_SURF_DATADIR; usePil = 1; opterr = 1; while ((ch = getopt(ac, av, "vxl:d:f:p:q:Q:o:i:a:t:")) != EOF) switch (ch) { case 'v': argv[argc++] = "-v"; logmask |= LOG_MASK(LOG_INFO); break; case 'x': argv[argc++] = "-x"; logmask |= LOG_MASK(LOG_DEBUG); break; case 'l': argv[argc++] = "-l"; argv[argc++] = optarg; logfname = optarg; break; case 'd': datadir = optarg; break; case 'f': fterr = strfeedtypet(optarg, &spec.feedtype); if(fterr != FEEDTYPE_OK) { fprintf(stderr, "%s: %s: \"%s\"\n", av[0], strfeederr(fterr), optarg); usage(progname); } argv[argc++] = "-f"; argv[argc++] = optarg; break; case 'p': spec.pattern = optarg; /* compiled below */ break; case 'q': pqfname = optarg; break; case 'Q': opqfname = optarg; break; case 'o': toffset = atoi(optarg); if(toffset == 0 && *optarg != '0') { fprintf(stderr, "%s: invalid offset %s\n", av[0], optarg); usage(av[0]); } argv[argc++] = "-o"; argv[argc++] = optarg; break; case 'i': interval = atoi(optarg); if(interval == 0 && *optarg != '0') { fprintf(stderr, "%s: invalid interval \"%s\"\n", av[0], optarg); usage(av[0]); } /* N.B. -i just used for input queue. */ break; case 'a': age = atof(optarg); if(age < 0.) { (void) fprintf(stderr, "age (%s) must be non negative\n", optarg); usage(av[0]); } break; case 't': /* pipe_timeo */ argv[argc++] = "-t"; argv[argc++] = optarg; break; case '?': usage(progname); break; } (void) setulogmask(logmask); status = regcomp(&spec.rgx, spec.pattern, REG_EXTENDED|REG_NOSUB); if(status != 0) { fprintf(stderr, "Bad regular expression \"%s\"\n", spec.pattern); usage(av[0]); } if(ac - optind == 1) conffilename = av[optind]; argv[argc++] = "-d"; argv[argc++] = datadir; argv[argc++] = "-q"; argv[argc++] = opqfname; argv[argc++] = conffilename; age *= 3600.; } if(toffset != TOFFSET_NONE) { clss.from.tv_sec -= toffset; } else { clss.from.tv_sec -= (age - interval); } /* * Set up error logging. * N.B. log ident is the remote */ (void) openulog(progname, logoptions, LOG_LDM, logfname); unotice("Starting Up (%d)", getpgrp()); /* * register exit handler */ if(atexit(cleanup) != 0) { serror("atexit"); exit(1); } /* * set up signal handlers */ set_sigactions(); /* * Open the Output product que */ status = pq_open(opqfname, PQ_DEFAULT, &opq); if(status) { uerror("pq_open failed: %s: %s\n", opqfname, strerror(status)); exit(1); } act_pid = run_child(argc, argv); if(act_pid == (pid_t)-1) exit(1); /* * Open the product que */ status = pq_open(pqfname, PQ_READONLY, &pq); if(status) { uerror("pq_open failed: %s: %s\n", pqfname, strerror(status)); exit(1); } if(toffset == TOFFSET_NONE) { /* Jump to the end of the queue */ timestampt sav; sav = clss.from; clss.from = TS_ZERO; (void) pq_last(pq, &clss, NULL); clss.from = sav; } else { pq_cset(pq, &clss.from); } if(ulogIsVerbose()) { char buf[1984]; uinfo("%s", s_prod_class(buf, sizeof(buf), &clss)); } while(!done) { if(stats_req) { dump_stats(); stats_req = 0; } status = pq_sequence(pq, TV_GT, &clss, split_prod, NULL); switch(status) { case 0: /* no error */ continue; /* N.B., other cases sleep */ case PQUEUE_END: udebug("surf: End of Queue"); break; case EAGAIN: case EACCES: udebug("Hit a lock"); break; default: uerror("pq_sequence failed: %s (errno = %d)", strerror(status), status); exit(1); break; } if(interval == 0) { break; } (void) expire(opq, interval, age); pq_suspend(interval); } /* * TODO: how can we determine that pqact has finished * the work in opq? */ sleep(5); exit(0); }