[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: HELLO (fwd)
- Subject: Re: HELLO (fwd)
- Date: Tue, 27 Feb 2001 15:57:00 -0700 (MST)
Anne,
Here's some info about configure feedtype at the NRS side. This is for
FYI only unless you want to act on it.
Robb...
===============================================================================
Robb Kambic Unidata Program Center
Software Engineer III Univ. Corp for Atmospheric Research
address@hidden WWW: http://www.unidata.ucar.edu/
===============================================================================
---------- Forwarded message ----------
Date: Tue, 20 Feb 2001 13:31:19 +0000
From: Stonie R. Cooper <address@hidden>
To: Brendon Hoch <address@hidden>
Declan <address@hidden>, address@hidden
Subject: Re: HELLO
Brendon,
I have attached the pqpdinrs.c file with the edit that will allow you to
configure feedtype at the NRS side. Save off the attached, then do the
following, logged in as ldm (or root, if you run ldm as root):
$ cd ~/runtime/src/pqpdinrs
$ mv pqpdinrs.c pqpdinrs.c.bak
$ mv <where ever you have the attached>/pqpdinrs.c .
$ make
$ ldmadmin stop
$ make install
$ ldmadmin start
Then, on the NRS side, you add a field to the WMO/PIL recreated string to
indicate the feedtype - delimited with a pipe symbol ( "|" );
as an example . . .
WMO ^S[AP]..
{
label=metar and speci reports
action
{
socket >> my_ldm_box:3333:IDS|%T%A%i_%C_%D%G%g_%N%x
}
}
In this case, the pqpdinrs will push all the SA and SP headed bulletins onto
the LDM queue as feedtype "IDS", with the recreated WMO/AWIPS concatenated
header.
Let me know how it goes . . . you'll be the first to try this. Also, if you
don't designate a feedtype on the NRS side, the pqpdinrs will default to WMO
- so you don't have to worry about changing them all at once. It'll run with
the old config . . .
This should allow you to match the current IDD - or at least closely - so
that not only will you have the WW type preamble and closing character on
each bulletin, but you will also have ability to pass feedtype with WMO/AWIPS
header info . . .
Finally, though, if you retrieve the code for the converting your GINI
imagery to the PNG formatted satellite imagery represented on IDD, you may
need additional updates to the pqpdinrs so that you can switch the preamble
and closing character on/off by message. Let me know if this becomes of
interest.
--
Stonie R. Cooper,
Science Officer
Planetary Data, Incorporated
3495 Liberty Road
Villa Rica, Georgia 30180
ph. (770) 456-0700; pg. (888) 974-5017; fx. (770) 459-0016
/******************************************************************************/
/* Copyright 1998, Planetary Data, Incorporated, Marietta, Georgia USA */
/* The purchaser has the right, under the purchase agreement, to edit, */
/* change, or otherwise use the C programming source code contained in this */
/* file for any purpose, other than resale, or inclusion in resold */
/* software. */
/* File: pqpdinrs.c */
/* Author: Stonie R. Cooper, Science Officer, PDI */
/* Documentation: PDINRS Manual, Maintaining and customizing your NOAAPort */
/* Receive System, Version 1.2 */
/* Revision Record: */
/* Created: 19990523.1531UTC by Stonie R. Cooper */
/* - adapted file from original pdinrsd.c; source for client */
/* software distributed with pdinrs. */
/* Revised: 20000223.1300UTC by Stonie R. Cooper */
/* - NOAA has decided to send some messages without the */
/* control tail string; added false_tail implementation to */
/* keep apps dependent on the ^C alive and well. */
/* Compiling directions - */
/* run patching script for inserting into LDM distro; then rerun */
/* ./configure; make; make install - just like you did for original LDM. */
/******************************************************************************/
/* WARRANTY STATEMENT - there is no warranty with this source code - implied */
/* or otherwise; it is delivered "as is" as a service to customers looking at */
/* using the pdinrs with Unidata's LDM software. If you find something wrong */
/* with the code - fix it yourself. Feel free to alter and make the code do */
/* whatever you want it to - but we don't support it, warranty it, or */
/* guarentee results. */
/******************************************************************************/
/* This program's function is to receive a data stream on an assigned port */
/* from a PDI NOAAPort Receive System, and prepare data for ldm. */
/******************************************************************************/
/* System includes */
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/time.h>
#include <signal.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include "pqinsert_function.h"
/* Defines */
#define BACKLOG 10 /* max pending connections */
#define MAXFRAMESIZE 5204 /* max _frame_size (not message size) */
#ifndef FALSE
#define FALSE 0 /* some people don't, so we check */
#endif
#ifndef TRUE
#define TRUE 1 /* some people don't, so we check */
#endif
#define BUFFER_SIZE 1024*1024*2 /* for buffer passing, 2MB. */
/* NOTE: This buffer'ing will obviously do no good for the satellite channels,*/
/* with realized file sizes in the tens of MB's. */
unsigned char keep_going = TRUE;
int verbose = FALSE;
int memory = FALSE;
char filename[256];
char false_header[] = { 1, '\r', '\r', '\n', '9', '9', '9',
'\r', '\r', '\n', '\0' };
/* Added 20000223 for missing ^C in some NWS messages. */
char false_tail[] = { '\r', '\r', '\n', 3, '\0' };
/* Function prototypes */
void print_usage(void);
/* These following two protos are Unidata's - they substitute for PDI's normal*/
/* signal handlers as they tie the rpc'ed process handling for proper start */
/* and stopping of ldm. */
static void signal_handler(int sig);
static void set_sigactions(void);
/* MAIN */
void main (int argc, char *argv[])
{
FILE *opf = NULL;
char *buf_counter;
char last_byte = 0;
char logfile[256];
char queue[256];
char pqhome[256];
char size[5];
char start_flag = 0;
char write_flag = 0;
fd_set rfds;
feedtypet feedtype;
feedtypet default_feedtype;
int bcount = 0; /* this is our _frame_ (not product) size counter */
int bytes_read = 0;
int c = 0;
int count = 0; /* just a generic counter that we use in for loops */
int frame_size = 0;
int new_fd = 0; /* new connection on new_fd */
int one = 1;
int output_switch = 0;
int retval = 0;
int sin_size = 0; /* size of a sockaddr_in - we use this in the accept */
int sockfd = 0; /* listen on sock_fd */
int time_out = 0;
short int port = 0;
struct timeval tv;
struct sockaddr_in my_addr; /* my address information */
struct sockaddr_in their_addr; /* connector's address information */
/* Have to at least have one argument - the port, so an easy kill if true. */
if (argc < 2)
{
printf("Usage: %s -[fhlmqv] <port number>\n", argv[0]);
printf(" : (<port number> must be greater than 2000)\n");
exit(1);
}
/* We'll initiate a lot of our variables here. */
memset((void *)queue, 0, sizeof(queue));
memset((void *)logfile, 0, sizeof(logfile));
memset((void *)pqhome, 0, sizeof(pqhome));
memset((void *)filename, 0, sizeof(filename));
memset((void *)filename, 0, sizeof(filename));
if ((getenv("LDMHOME")) != NULL)
strcpy(pqhome, (getenv("LDMHOME")));
else
strcpy(pqhome, "/usr/local/ldm");
if ((getenv("LDMPQFNAME")) != NULL)
strcpy(queue, (getenv("LDMPQFNAME")));
else
sprintf(queue, "%s/data/ldm.pq", pqhome);
sprintf(logfile, "%s/logs/pqpdinrs.log", pqhome);
default_feedtype = atofeedtypet("WMO");
/* And we need to retrieve our arguments and the values associated to replace */
/* the defaults. */
while (TRUE)
{
c = getopt(argc, argv, "f:hl:mq:vx");
if (c == -1)
break;
switch (c)
{
case 0:
{
print_usage();
break;
}
case 'f':
{
if ((strlen(optarg)) != 0)
{
default_feedtype = atofeedtypet(optarg);
}
break;
}
case 'h':
{
print_usage();
break;
}
case 'l':
{
if ((strlen(optarg)) != 0)
{
memset((void *)logfile, 0, sizeof(logfile));
strcpy(logfile, optarg);
}
break;
}
case 'm':
{
memory = TRUE;
break;
}
case 'q':
{
if ((strlen(optarg)) != 0)
{
memset((void *)queue, 0, sizeof(queue));
strcpy(queue, optarg);
}
break;
}
case 'v':
{
verbose = TRUE;
break;
}
case 'x':
{
verbose = 2;
break;
}
default:
{
print_usage();
}
}
}
argc -= optind;
argv += optind;
if (optind < argc)
{
printf("pqpdinrs: arguments unrecognizable: ");
while (optind < argc)
printf ("%s ", argv[optind++]);
printf("\n");
printf(" - stopping execution.\n");
exit(0);
}
/* Get the port number to listen on, and make sure it's a good one. */
for (count = 0; count < strlen(argv[(argc - 1)]); count++)
{
if ((isdigit(argv[(argc - 1)][count])) == 0)
{
printf("Usage: %s -[fhlmqv] <port number>\n", argv[0]);
printf(" : (<port number> must be greater than 2000)\n");
exit(1);
}
}
port = atoi(argv[(argc - 1)]);
if (port < 2000 && port > 32000)
{
printf("Port = %d; illegal port number.\n", port);
printf("Usage: %s -[fhlmqv] <port number>\n", argv[0]);
printf(" : (<port number> must be greater than 2000)\n");
exit(1);
}
/* Daemonization of pdinrsd is removed from here - to use with ldm. */
/* Establish signal handler to shutdown gracefully and close socket */
set_sigactions();
/* Create the socket */
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
exit(-1);
}
my_addr.sin_family = AF_INET; /* host byte order */
my_addr.sin_port = htons(port); /* short, network byte order */
my_addr.sin_addr.s_addr = INADDR_ANY; /* automatically fill with my IP */
bzero(&(my_addr.sin_zero), 8); /* zero the rest of the struct */
/* Allow port reuse on the socket */
if ((setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&one,sizeof(int)))
== -1)
{
exit(-1);
}
/* Bind the socket to a port */
if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1)
{
exit(-2);
}
/* Begin to listen for connections */
if (listen(sockfd, BACKLOG) == -1)
{
exit(-3);
}
/* Main accept loop */
while(keep_going)
{
sin_size = sizeof(struct sockaddr_in);
/* Accept a new connection */
new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
/* Spawn child process to handle this connection. */
if (fork() == 0)
{
char buf[MAXFRAMESIZE+sizeof(char)+4];
char real_buffer[BUFFER_SIZE + sizeof(false_header)];
char *buffer = NULL;
char *temp_ptr = NULL;
unsigned long int product_size = 0;
char *buf_ptr = NULL;
unsigned char first_frame = TRUE;
char wmohdr[258];
char pdi_feed_type[32];
unsigned char error_code = 0;
struct timeval child_tv;
struct timezone child_tz;
memset((void *)buf, 0, sizeof(buf));
memcpy((void *)real_buffer, (void *)false_header, strlen(false_header));
buffer = &real_buffer[(strlen(false_header))];
/* Close socket discriptor. Not needed by child process */
close(sockfd);
/* Set up file descriptor set for select. The set only contains new_fd as
select is used only to handle a timeout on the connection. */
FD_ZERO(&rfds);
FD_SET(new_fd, &rfds);
time_out = FALSE;
while (!time_out)
{
/* Set timeout structure */
tv.tv_sec = 600;
tv.tv_usec = 0;
retval = select(new_fd+1, &rfds, NULL, NULL, &tv);
/* If select returns a positive number there is data on the socket. We
could use FD_ISSET(new_fd, &rfds) for this test. */
if (retval >= 1)
{
/* Read the first byte to see if this is a start frame, data frame, or
close file frame. */
if ((bytes_read = recv(new_fd, &start_flag, 1, 0)) > 0)
{
/* A start frame is composed of the following:
byte 1 - set to value 1 to signify start frame
byte 2 - set to 3 for file create/overwrite
set to 4 for file append
bytes 3-258 - file pathname */
if (start_flag == 1)
{
if (output_switch)
{
memset((void *)buffer, 0, BUFFER_SIZE);
buf_ptr = NULL;
product_size = 0;
output_switch = 0;
}
bcount = 0;
bytes_read = 0;
buf_counter = buf;
/* We are not guaranteed to read all of the bytes that we requested, so we
must continue until we have enough */
while (bcount < 257)
{
if ((bytes_read = recv(new_fd, buf_counter,
(257 - bcount), 0)) > 0)
{
bcount += bytes_read;
buf_counter += bytes_read;
}
}
if (bcount == 257)
{
write_flag = *buf;
memset((void *)wmohdr, 0, sizeof(wmohdr));
strncpy(wmohdr, buf+1, 256);
for (count = 0; count < (strlen(wmohdr)); count++)
{
if (wmohdr[count] == '_')
wmohdr[count] = ' ';
}
temp_ptr = NULL;
memset((void
*)pdi_feed_type, 0, sizeof(pdi_feed_type));
feedtype = default_feedtype;
if ((temp_ptr =
strchr(wmohdr, '|')) != NULL)
{
char
temp_string[256];
memset((void
*)temp_string, 0, sizeof(temp_string));
strncpy(pdi_feed_type, wmohdr,
((strlen(wmohdr)) - (strlen(temp_ptr))));
temp_ptr++;
sprintf(temp_string, "%s", temp_ptr);
memset((void *)wmohdr, 0, sizeof(wmohdr));
strcpy(wmohdr, temp_string);
feedtype = atofeedtypet(pdi_feed_type);
}
count = strlen(wmohdr);
wmohdr[count] = '\r';
wmohdr[(count+1)] = '\r';
wmohdr[(count+2)] = '\n';
wmohdr[(count+3)] = '\0';
output_switch = 1;
}
}
else
{
/* A data frame is composed of the following:
byte 1 - set to value 0 to signify data frame
byte 2-5 - number of data bytes following (ascii characters; maximum 9999)
bytes 6-variable - data */
if (start_flag == 0)
{
if (output_switch)
{
bcount = 0;
bytes_read = 0;
buf_counter = buf;
/* Get the size of the data. Remember it's ascii so we must convert it. */
while (bcount < 4)
{
if ((bytes_read = recv(new_fd, buf_counter,
(4 - bcount), 0)) > 0)
{
bcount += bytes_read;
buf_counter += bytes_read;
}
else if (bytes_read < 0)
fprintf(stderr, "Error reading socket.");
}
strncpy(size, buf, 4);
size[4] = '\0';
frame_size = atoi(size);
/* Again, read until we get all of the bytes that we requested. */
bcount = 0;
bytes_read = 0;
buf_counter = buf;
while (bcount < frame_size)
{
if ((bytes_read = recv(new_fd, buf_counter,
(frame_size - bcount), 0)) > 0)
{
bcount += bytes_read;
buf_counter += bytes_read;
}
else if (bytes_read < 0)
perror("read");
}
/* For the first frame, we have a couple of things we need to do - */
/* First, we need to set the buffer pointer to the byte array, */
/* we also need to set up /tmp space (i.e. create a temp file) if we are */
/* not using the memory buffer, and then we are ready to load. */
if (first_frame)
{
first_frame = FALSE;
if (memory)
{
buf_ptr = buffer;
}
else
{
buf_ptr = NULL;
product_size = 0;
gettimeofday(&child_tv, &child_tz);
sprintf(filename, "/tmp/%lu.%lu", child_tv.tv_sec,
child_tv.tv_usec);
if ((opf = fopen(filename, "wb")) == NULL)
{
opf = NULL;
filename[0] = '\0';
}
}
}
/* This is where we load the data - if memory, we simply load it into our */
/* big buffer, if using files, we write it to our tmp file, that was named */
/* using seconds and useconds from the current timeval struct. Feel free */
/* to get fancier here, i.e. calculate some mystical name from the WMO, or */
/* some random sequence, but as timeval is sequencial, there is no chance */
/* of duplicates unless you are running multiple sessions. Then you may */
/* want to get fancy with the /tmp space file nameing. */
if (memory)
{
if ((product_size + bcount) <= BUFFER_SIZE)
{
memcpy((void *)buf_ptr, buf, bcount);
buf_ptr = &buf_ptr[bcount];
product_size += bcount;
}
else
{
fprintf(stderr, "pqpdinrs - product larger than
buffer.\n");
fflush(stderr);
exit(1);
}
}
else
{
/* Added 20000223 for missing ^C in some NWS messages: BEGIN. */
last_byte = buf[(bcount - 1)];
/* Added 20000223 for missing ^C in some NWS messages: END. */
if ((opf != NULL) && ((fwrite((void *)buf, bcount, 1, opf))
!= 1))
{
fclose(opf);
opf = NULL;
if ((strlen(filename)) != 0)
{
remove(filename);
filename[0] = '\0';
}
}
}
}
}
else
{
/* So, we got this far. If we arrive here, it means the socket code got a */
/* "2", which means the product is complete. Let us tidy things up, then */
/* call the LDM function that we have adapted for this wacky pdinrs thing. */
first_frame = TRUE;
/* If we are not using memory, and the file descriptor is actually pointing */
/* to something, we close it. If you don't do this, some of the smaller */
/* files will never see the ldm, as the flush doesn't occur until now. */
if ((!memory) && (opf != NULL))
{
/* Added 20000223 for missing ^C in some NWS messages: BEGIN. */
if (last_byte != 3)
{
if ((fwrite((void *)false_tail, 4, 1, opf)) != 1)
{
fprintf(stderr, "pqpdinrs: Couldn't write false tail\n");
fflush(stderr);
}
else
product_size+=strlen(false_tail);
}
/* Added 20000223 for missing ^C in some NWS messages: END. */
fclose(opf);
opf = NULL;
}
/* Added 20000223 for missing ^C in some NWS messages: BEGIN. */
else if (buffer[(product_size -1)] != 3)
{
memcpy((void *)&buffer[product_size], false_tail, 5);
product_size+=strlen(false_tail);
}
/* Added 20000223 for missing ^C in some NWS messages: END. */
/* To quote those all important comments in the Unidata LDM code - let's do */
/* the deed. */
if ((error_code = pqinsert_function(real_buffer, filename,
logfile, queue, argv[0], wmohdr, feedtype, verbose,
product_size + strlen(false_header),
memory)) != SUCCESSFUL)
{
fprintf(stderr, "pqpdinrs = %d\n", error_code);
fflush(stderr);
}
/* We check to see if we are using memory instead of files. The memory */
/* option is certainly faster, but you are limited to a buffer size; for */
/* the initial writing of this, I have made it a couple MB. */
if (memory)
{
memset((void *)buffer, 0, BUFFER_SIZE);
buf_ptr = NULL;
product_size = 0;
}
/* And if we are not using memory to buffer, then we need to get rid of the */
/* /tmp file - a simple remove ought to do it - handles errors better. */
else if ((strlen(filename)) != 0)
{
remove(filename);
filename[0] = '\0';
}
/* We need to set the output_switch back so we will know to initialize our */
/* buffer and check our file descriptor - initialization stuff. */
if (output_switch)
{
output_switch = 0;
}
}
}
}
else
{
/* Select sees an EOF on the socket but the read returns 0 bytes read. This */
/* means that the other side has closed the socket. Set timeout so we can */
/* loop back up to accept another connection. */
time_out = TRUE;
}
}
else
{
/* If a time-out value is used with select and it returns a 0, then the select
timed out without seeing any data. We set the timeout flag accordingly. */
if (retval == 0)
{
time_out = TRUE;
}
}
}
if (opf != NULL)
{
fclose(opf);
opf = NULL;
}
if ((strlen(filename)) != 0)
{
remove(filename);
filename[0] = '\0';
}
close(new_fd); /* close descripter for child */
exit(0);
}
close(new_fd); /* close descripter for parent; not needed */
}
close(sockfd);
exit(0);
return;
}
/* Signal handling code from LDM - don't know what they can have fail. */
static void signal_handler(int sig)
{
pid_t pid;
int stat;
#ifdef SVR3SIGNALS
/*
* Some systems reset handler to SIG_DFL upon entry to handler.
* In that case, we reregister our handler.
*/
(void) signal(sig, signal_handler);
#endif
switch(sig)
{
case SIGCHLD :
{
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0);
return;
}
case SIGINT :
{
if ((strlen(filename)) != 0)
{
remove(filename);
filename[0] = '\0';
}
exit(1);
}
case SIGTERM :
{
if ((strlen(filename)) != 0)
{
remove(filename);
filename[0] = '\0';
}
exit(1);
}
case SIGALRM :
{
if ((strlen(filename)) != 0)
{
remove(filename);
filename[0] = '\0';
}
exit(1);
}
case SIGPIPE :
{
udebug("SIGPIPE");
return;
}
case SIGUSR1 :
{
if (verbose == TRUE)
verbose = 2;
else if (verbose == 2)
verbose = FALSE;
else
verbose = TRUE;
return;
}
case SIGUSR2 :
{
if (memory)
memory = FALSE;
else
{
if ((strlen(filename)) != 0)
{
remove(filename);
filename[0] = '\0';
}
memory = TRUE;
}
return;
}
}
udebug("signal_handler: unhandled signal: %d", sig);
}
/* Signal handling code from LDM - don't know what they can have fail. */
static void set_sigactions(void)
{
#ifndef NO_POSIXSIGNALS
struct sigaction sigact;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
/* Ignore these */
sigact.sa_handler = SIG_IGN;
(void) sigaction(SIGHUP, &sigact, NULL);
(void) sigaction(SIGCHLD, &sigact, NULL);
(void) sigaction(SIGPIPE, &sigact, NULL);
(void) sigaction(SIGUSR1, &sigact, NULL);
(void) sigaction(SIGUSR2, &sigact, NULL);
/* Handle these */
#ifdef SA_RESTART /* SVR4, 4.3+ BSD */
/* usually, restart system calls */
sigact.sa_flags |= SA_RESTART;
#endif
sigact.sa_handler = signal_handler;
(void) sigaction(SIGTERM, &sigact, NULL);
(void) sigaction(SIGCHLD, &sigact, NULL);
(void) sigaction(SIGALRM, &sigact, NULL);
(void) sigaction(SIGUSR1, &sigact, NULL);
(void) sigaction(SIGUSR2, &sigact, NULL);
/* Don't restart after interrupt */
sigact.sa_flags = 0;
#ifdef SA_INTERRUPT /* SunOS 4.x */
sigact.sa_flags |= SA_INTERRUPT;
#endif
(void) sigaction(SIGINT, &sigact, NULL);
#else
(void) signal(SIGHUP, SIG_IGN);
(void) signal(SIGPIPE, SIG_IGN);
(void) signal(SIGALRM, SIG_IGN);
(void) signal(SIGCHLD, SIG_IGN);
(void) signal(SIGTERM, signal_handler);
(void) signal(SIGPIPE, signal_handler);
(void) signal(SIGINT, signal_handler);
#endif
}
void print_usage(void)
{
printf("pqpdinrs: PDINRS daemon for feeding data to Unidata's LDM:\n");
printf("pqpdinrs -[fhlmqv] <socket number>\n");
printf(" -f : LDM feedtype (example - \"WMO\");\n");
printf(" -h : this quick help;\n");
printf(" -l : log file; default is $LDMHOME/logs/pqpdinrs.log;\n");
printf(" -m : buffer data in memory (2 MB max limit on products);\n");
printf(" -q : queue file; default is $LDMHOME/data/ldm.pq;\n");
printf(" -v : run in verbose mode;\n");
printf(" -x : run in verbose mode, with debugging messages;\n");
printf(" <socket number> : socket port to connect to PDINRS - required
argument.\n");
exit(0);
}