This archive contains answers to questions sent to Unidata support through mid-2025. Note that the archive is no longer being updated. We provide the archive for reference; many of the answers presented here remain technically correct, even if somewhat outdated. For the most up-to-date information on the use of NSF Unidata software and data services, please consult the Software Documentation first.
Steven, >Date: Thu, 31 Jul 2003 11:23:40 -0500 >From: Steven Danz <address@hidden> >Organization: Aviation Weather Center >To: address@hidden >Subject: Questions/issues with pqact The above message contained the following: > I've run across an interesting issue with pqact, and while I don't > have a solution, I have a minor (maybe) change for pqact and some > questions (of course). > > What I'm seeing. If I watch the logs for the gempak decoders I'm > running I'm seeing the metar, taf, etc (actually all of them) decoders > start/stop/start every minute or so. While things are thrashing > starting/stopping isn't great it shouldn't break anything. The > problem that is occurring is that every once in awhile a new decoder > starts up before the old decoder stops which corrupts the output file. > > First, why the trashing. In its current form, the filel.c only allows > for 32 open files or pipes. When there are a large number of files > and pipes in use (which there is in my case, I have tons of files > being written), there is the potential for thrashing the pipes as > the decoders are shutdown and restarted. Why not use code similar to > action.c to determine the number of possible open files by asking the > OS, and then dropping off some for std[in/out/err], shared memory, > rpc. Dividing by 2 seems a bit pessimistic, but an option. I've > attached a filel.c (from 5.2.2, 6.1.14 has the same issue) where I'm > trying this and it seems to work. Good points. Unfortunately, I haven't worked with pqact(1) yet, so I don't yet know what to say. I'll put this on my list of things to investigate. One thing springs to mind: if I put the LDM package on SourceForge, would you be willing to be a developer? > Second, why the startup of a decoder before the old one shuts down? I > haven't had a chance to work my way through the code yet, but I'm > wondering if the mechanism used to close the pipe wait until the > application at the other end is completely closed, or does it just > close the pipe at the pqact end and move on, not waiting for the > process to terminate? I'm afraid I don't yet know the answer. > As a work around I'm thinking of running two pqacts, one with a > configuration with just pipes and execs, the other with all the files. > Is this supported? Yes. In fact, that's typically the way the GEMPAK and McIDAS decoders work: they have their own pqact(1) configuration-files. > I wasn't sure if running multiple pqacts on the > same queue would work or not. Shouldn't be a problem. > Thanks for your time > > Steven > > -- > Steven Danz > Senior Software Development Engineer > Aviation Weather Center (NOAA/NWS/NCEP) > 7220 NW 101st Terrace, Room 101 > Kansas City, MO 64153-2371 > > Email: address@hidden > Phone: 816.584.7251 > Fax: 816.880.0650 > URL: http://aviationweather.gov/ > > The opinions expressed in this message do not necessarily reflect those > of the National Weather Service, or the Aviation Weather Center. Regards, Steve Emmerson > --------------070609090501010602000202 > Content-Type: text/plain; > name="filel.c" > Content-Transfer-Encoding: 7bit > Content-Disposition: inline; > filename="filel.c" > > /* > * Copyright 1993, University Corporation for Atmospheric Research > * See ../COPYRIGHT file for copying and redistribution conditions. > */ > /* $Id: filel.c,v 1.2 2003/07/31 05:07:55 sdanz Exp $ */ > > /* #define _POSIX_SOURCE */ > > #include <features.h> > #include <ldmconfig.h> > #include <stdio.h> > #include <assert.h> > #include <string.h> > #include <ctype.h> > #include <limits.h> /* PATH_MAX */ > #ifndef PATH_MAX > #define PATH_MAX 255 > #endif /* !PATH_MAX */ > #include <sys/types.h> > #include <sys/stat.h> > #include <fcntl.h> /* O_RDONLY et al */ > #include <unistd.h> /* access, lseek */ > #include <signal.h> > #include <errno.h> > > #if defined(_AIX) && !defined(NO_WAITPID) > /* > * Use POSIX wait macros, not _BSD > */ > #define _H_M_WAIT > #endif > #include <sys/wait.h> > > > #include "filel.h" > #include "action.h" > #include "ldm.h" > #include "ldmalloc.h" > #include "mkdirs_open.h" > #include "ulog.h" > #include "pbuf.h" > > /* > * Defined in pqcat.c > */ > extern int pipe_timeo; > > #ifndef NO_DB > > /* > * Define DB_XPROD non zero if you want the whole "product" data > * structure put into the database, otherwise just the data goes in. > * Be sure this is consistant with ../dbcat and whatever else used > * to read the files. > */ > # ifndef DB_XPROD > # define DB_XPROD 1 > # endif > > /* > * Backward compatibility. > * If you want to use gdbm interfaces, define USE_GDBM > */ > # ifdef USE_GDBM > # include "gdbm.h" > # else > # include <ndbm.h> > # endif > > #endif /* !NO_DB */ > > /* > * Tuning parameter: MAXENTRIES is the number of descriptors > * you wish to allocate to this list. You need to leave enough around > * for the error output and for rpc. > */ > /* #define MAXENTRIES OPEN_MAX/2 */ > #define MAXENTRIES 32 > > > /* > * > */ > typedef enum { > FT_NONE = 0, > UNIXIO, > STDIO, > PIPE, > FT_DB > } ft_t ; > > > union f_handle { > int fd; > FILE *stream; > pbuf *pbuf; > #ifndef NO_DB > # ifdef USE_GDBM > GDBM_FILE db; > # else > DBM *db; > # endif > #endif /*!NO_DB*/ > }; > typedef union f_handle f_handle; > > > struct fl_entry { > int flags; > ft_t type; > struct fl_entry *next; > struct fl_entry *prev; > struct fl_ops *ops; > f_handle handle; > long serial; /* a serial number for this connection */ > unsigned long private; /* pid, hstat *, read/write flg */ > char path[PATH_MAX+1]; > }; > typedef struct fl_entry fl_entry; > > #if defined(__cplusplus) || defined(__STDC__) > struct fl_ops { > int (*cmp)(fl_entry *, int, char**); > int (*open)(fl_entry *, int, char**); > void (*close)(fl_entry *); > int (*sync)(fl_entry *, int); > int (*put)(fl_entry *, const char *, > const void *, size_t ); > }; > #else /* Old Style C */ > struct fl_ops { > int (*cmp)(); > int (*open)(); > void (*close)(); > int (*sync)(); > int (*dbufput)(); > }; > #endif > > > /* > * the one global list of of open files > */ > static struct fl { > int size; > fl_entry *head; > fl_entry *tail; > } thefl[] = { > 0 , > NULL , > NULL > }; > > > #define TO_HEAD(entry) \ > if(thefl->head != entry) to_head(entry) > > static void > to_head(fl_entry *entry) > { > if(thefl->head == entry) > return; > > if(entry->prev != NULL) > entry->prev->next = entry->next; > if(entry->next != NULL) > entry->next->prev = entry->prev; > > if(thefl->head != NULL) > thefl->head->prev = entry; > if(thefl->tail == entry) > thefl->tail = entry->prev; > > entry->next = thefl->head; > entry->prev = NULL; > thefl->head = entry; > if(thefl->tail == NULL) > thefl->tail = entry; > } > > > static void > free_fl_entry(fl_entry *entry) > { > if(entry == NULL) return; > > if(entry->ops != NULL) > { > entry->ops->close(entry); > } > free(entry); > } > > > /* > * generate a serial number, used to validate > * Action cache entries. > */ > static long > newSerial(void) > { > struct timeval now; > if( gettimeofday(&now, NULL) < 0) > { > serror("newSerial: gettimeofday"); > return ((long) rand() ); > } > return ( ((now.tv_sec % 1000000) * 1000) + (now.tv_usec / 1000)); > } > > /* forward reference */ > static fl_entry * new_fl_entry(ft_t type, int argc, char **argv); > > #ifdef FL_DEBUG > static void > dump_fl(void) > { > fl_entry *entry; > int fd; > > udebug(" thefl->size %d", thefl->size); > for(entry = thefl->head; entry != NULL; > entry = entry->next ) > { > switch (entry->type) { > case UNIXIO : > fd = entry->handle.fd; > break; > case STDIO : > fd = entry->handle.stream == NULL > ? -1 : fileno(entry->handle.stream); > break; > case PIPE : > fd = entry->handle.pbuf == NULL > ? -1 : entry->handle.pbuf->pfd; > break; > case FT_DB : > #ifndef NO_DB > fd = entry->handle.db == NULL > ? -1 : -2; > break; > #endif /* !NO_DB */ > default : > fd = -2; > } > udebug(" %d %s", fd, entry->path); > } > } > #endif > > static fl_entry * > lookup_fl_entry(ft_t type, int argc, char **argv) > { > fl_entry *entry = NULL; > > for(entry = thefl->head; entry != NULL; > entry = entry->next ) > { > if(entry->type == type && > entry->ops->cmp(entry, argc, argv) == 0) > break; > } > return entry; > } > > > static void > delete_entry(fl_entry *entry) > { > /* assert(thefl->size >= 1); */ > if(entry == NULL) return; > > if(entry->prev != NULL) > entry->prev->next = entry->next; > if(entry->next != NULL) > entry->next->prev = entry->prev; > if(thefl->head == entry) > thefl->head = entry->next; > if(thefl->tail == entry) > thefl->tail = entry->prev; > thefl->size--; > > free_fl_entry(entry); > } > > > /* > * sync up to nentries entries, tail to head. > */ > void > fl_sync(int nentries, > int block) /* bool_t, FALSE => nonblocking */ > { > fl_entry *entry, *prev; > > /* udebug(" fl_sync"); */ > > if(thefl->size <= 0) > return; > if(nentries == -1) /* sync everyone */ > nentries = thefl->size; > > for(entry = thefl->tail; > entry != NULL && nentries >= 0; entry = prev, nentries--) > { > prev = entry->prev; > if(entry->flags & FL_NEEDS_SYNC) > { > if(entry->ops->sync(entry, block) == -1) > delete_entry(entry); > } > } > } > > > /* > * close the "least recently used" entry > */ > void > close_lru(int skipflags) > { > fl_entry *entry, *prev; > > if(thefl->size <= 0) > return; > entry = thefl->tail; > > > for(entry = thefl->tail; > entry != NULL; entry = prev) > { > prev = entry->prev; > /* twisted logic */ > if(entry->flags & skipflags) > continue; > /* else */ > /* udebug(" close_lru: %s", entry->path); */ > delete_entry(entry); > return; > } > } > > > void > fl_close_all(void) > { > while(thefl->size > 0) > { > close_lru(0); > } > } > > > /* > * Look for an fl_entry in the list. > * If there isn't one there that matches what you need, make a new one. > */ > static fl_entry * > get_fl_entry(ft_t type, int argc, char **argv) > { > fl_entry *entry; > > static int open_max = 0; /* number of descriptors */ > if (!open_max) > { > #ifdef _SC_OPEN_MAX > open_max = (int) sysconf(_SC_OPEN_MAX); > /* Hold some aside for std[in/out/err], shm handles and libraries */ > if (open_max > 32) { > open_max = open_max - 16; > } else { > open_max = open_max/2; > } > #else > open_max = 32; /* punt */ > #endif > } > > entry = lookup_fl_entry(type, argc, argv); > if( entry != NULL ) > { > TO_HEAD(entry); > #ifdef FL_DEBUG > dump_fl(); > #endif > return entry; > } > /* else */ > > if(thefl->size >= open_max) > close_lru(0); > > entry = new_fl_entry(type, argc, argv); > if( entry == NULL ) > { > return NULL; /* malloc or open failed */ > } > > /* to front */ > if(thefl->head != NULL) > thefl->head->prev = entry; > entry->next = thefl->head; > entry->prev = NULL; > thefl->head = entry; > if(thefl->tail == NULL) > thefl->tail = entry; > thefl->size++; > > #ifdef FL_DEBUG > dump_fl(); > #endif > return entry; > } > > > static int > atFinishedArgs(int ac, > char *av[], > fl_entry *entry) > { > int status = 0; > int syncflag = 0; > int closeflag = 0; > for(; ac > 1 && *av[0] == '-'; ac-- , av++) > { > if( strncmp(*av,"-close",3) == 0) > { > closeflag = !0; > } > else if( strncmp(*av,"-flush",3) == 0) > { > syncflag = !0; > } > } > if(syncflag) > status = (*entry->ops->sync)(entry, syncflag); > if(closeflag) > delete_entry(entry); > return status; > } > > > /* > * Given a dbuf, return a copy with the non '\n' > * control characters removed. > * Remember to free the result. > */ > static void * > dupstrip(const void *in, size_t len, size_t *outlenp) > { > void *out; > size_t blen; > const unsigned char *ip; > char *op; > > if(in == NULL || len == 0) > return NULL; > > out = malloc(len); > if(out == NULL) > { > serror("dupstrip: malloc %ld failed", (long) len); > return NULL; > } > > for(blen = len, ip = in, op = out, *outlenp = 0; > blen != 0; blen--, ip++) > { > if(((int)*ip) > 127 > || (iscntrl(*ip) && *ip != '\n')) > continue; > /* else */ > *op++ = *ip; > (*outlenp)++; > } > > return out; > } > > > /* Begin UNIXIO */ > static int > str_cmp( fl_entry *entry, int argc, char **argv) > { > char *path; > > assert(argc > 0); > assert(argv[argc -1] != NULL); > assert(*argv[argc -1] != 0); > > path = argv[argc-1]; > return(strcmp(path, entry->path)); > } > > > static int > unio_open(fl_entry *entry, int ac, char **av) > { > char *path; > int flags = (O_WRONLY|O_CREAT); > > assert(ac > 0); > assert(av[ac -1] != NULL); > assert(*av[ac -1] != 0); > > entry->handle.fd = -1; > > for(; ac > 1 && *av[0] == '-'; ac-- , av++) > { > if( strncmp(*av,"-overwrite",3) == 0) > { > flags |= O_TRUNC; > } > else if( strncmp(*av,"-strip",3) == 0) > { > entry->flags |= FL_STRIP; > } > } > > path = av[ac-1]; > > entry->handle.fd = mkdirs_open(path, flags, 0666); > if(entry->handle.fd == -1) > { > if(errno == EMFILE) > { > /* Too many open files */ > close_lru(0); > close_lru(0); > } > serror("unio_open: %s", path); > return -1; > } > if(!(flags & O_TRUNC)) > if(lseek(entry->handle.fd, 0, SEEK_END) < 0) > serror("unio_open:lseek: %s", path); > /* fatal ? what about devices? */ > > strncpy(entry->path, path, PATH_MAX); > entry->path[PATH_MAX] = 0; /* just in case */ > udebug(" unio_open: %d", entry->handle.fd); > return entry->handle.fd; > } > > > static void > unio_close(fl_entry *entry) > { > udebug(" unio_close: %d", entry->handle.fd); > if(entry->handle.fd != -1) > { > if(close(entry->handle.fd) == -1) > { > serror("close: %s", entry->path); > } > } > entry->path[0] = 0; > entry->handle.fd = -1; > } > > > static int > unio_sync(fl_entry *entry, int block) > { > /* > * Some systems may not have an fsync(2) call. > * The best you can do then would be to make this > * routine a noop which returns 0. > */ > int status = 0; > udebug(" unio_sync: %d %s", > entry->handle.fd, block ? "" : "non-block"); > if(block) > { > #ifndef NO_FSYNC > if(entry->handle.fd != -1) > status = fsync(entry->handle.fd); > if(status == -1) > { > serror("fsync: %s", entry->path); > } > #endif > entry->flags &= ~FL_NEEDS_SYNC; > } > return status; > } > > > /*ARGSUSED*/ > static int > unio_put(fl_entry *entry, const char *ignored, > const void *data, size_t sz) > { > int nwrote; > > TO_HEAD(entry); > udebug(" unio_dbufput: %d", entry->handle.fd); > > nwrote = (int) write(entry->handle.fd, data, sz); > if(nwrote != sz) > { > serror("unio_put: %s write error", > entry->path); > /* don't waste time syncing an errored entry */ > entry->flags &= ~FL_NEEDS_SYNC; > delete_entry(entry); > return -1; > } > /* else */ > entry->flags |= FL_NEEDS_SYNC; > return 0; > } > > > static struct fl_ops unio_ops = { > str_cmp, > unio_open, > unio_close, > unio_sync, > unio_put, > }; > > > /*ARGSUSED*/ > int > unio_prodput(const product *prodp, int argc, char **argv, > const void *ignored, size_t also_ignored) > { > int status = 0; > void *data = prodp->data; > size_t sz = prodp->info.sz; > fl_entry *entry = get_fl_entry(UNIXIO, argc, argv); > > udebug(" unio_prodput: %d %s", > entry == NULL ? -1 : entry->handle.fd , prodp->info.ident); > if(entry == NULL) > return -1; > > if(entry->flags & FL_STRIP) > { > data = dupstrip(prodp->data, prodp->info.sz, &sz); > if(data == NULL) > return -1; > } > > status = unio_put(entry, prodp->info.ident, data, sz); > if(data != prodp->data) > free(data); > if(status != -1) > status = atFinishedArgs(argc, argv, entry); > return status; > } > > > /* End UNIXIO */ > > /* Begin STDIO */ > static int > stdio_open(fl_entry *entry, int ac, char **av) > { > char *path; > int flags = (O_WRONLY|O_CREAT); > int fd; > char *mode = "a"; > /* extern FILE *fdopen(int, const char *); */ > > assert(ac > 0); > assert(av[ac -1] != NULL); > assert(*av[ac -1] != 0); > > entry->handle.stream = NULL; > > for(; ac > 1 && *av[0] == '-'; ac-- , av++) > { > if( strncmp(*av,"-overwrite",3) == 0) > { > flags |= O_TRUNC; > mode = "w"; > } > else if( strncmp(*av,"-strip",3) == 0) > { > entry->flags |= FL_STRIP; > } > } > > path = av[ac-1]; > > fd = mkdirs_open(path, flags, 0666); > if(fd == -1) > { > if(errno == EMFILE) > { > /* Too many open files */ > close_lru(0); > close_lru(0); > } > serror("mkdirs_open: %s", path); > return -1; > } > entry->handle.stream = fdopen(fd, mode); > if(entry->handle.stream == NULL) > { > serror("fdopen: %s", path); > return -1; > } > strncpy(entry->path, path, PATH_MAX); > entry->path[PATH_MAX] = 0; /* just in case */ > udebug(" stdio_open: %d", fileno(entry->handle.stream)); > return fileno(entry->handle.stream); > } > > > static void > stdio_close(fl_entry *entry) > { > udebug(" stdio_close: %d", > entry->handle.stream ? fileno(entry->handle.stream) : -1); > if(entry->handle.stream != NULL) > { > if(fclose(entry->handle.stream) == EOF) > { > serror("fclose: %s", entry->path); > } > } > entry->path[0] = 0; > entry->handle.stream = NULL; > } > > > /*ARGSUSED*/ > static int > stdio_sync(fl_entry *entry, int block) > { > int status = 0; > udebug(" stdio_sync: %d", > entry->handle.stream ? fileno(entry->handle.stream) : -1); > if(fflush(entry->handle.stream) == EOF) > { > serror("fflush: %s", entry->path); > status = -1; > } > entry->flags &= ~FL_NEEDS_SYNC; > return status; > } > > > /*ARGSUSED*/ > static int > stdio_put(fl_entry *entry, const char *ignored, > const void *data, size_t sz) > { > int nwrote; > > TO_HEAD(entry); > udebug(" stdio_dbufput: %d", fileno(entry->handle.stream)); > > /* else */ > nwrote = (int) fwrite(data, sz, 1, > entry->handle.stream); > if(nwrote != 1) > { > serror("stdio_put: %s fwrite error", > entry->path); > /* don't waste time syncing an errored entry */ > entry->flags &= ~FL_NEEDS_SYNC; > delete_entry(entry); > return -1; > } > /* else */ > entry->flags |= FL_NEEDS_SYNC; > return 0; > } > > > static struct fl_ops stdio_ops = { > str_cmp, > stdio_open, > stdio_close, > stdio_sync, > stdio_put, > }; > > > /*ARGSUSED*/ > int > stdio_prodput(const product *prodp, int argc, char **argv, > const void *ignored, size_t also_ignored) > { > int status = 0; > void *data = prodp->data; > size_t sz = prodp->info.sz; > fl_entry *entry = get_fl_entry(STDIO, argc, argv); > > udebug(" stdio_prodput: %d %s", > entry == NULL ? -1 : > fileno(entry->handle.stream) , prodp->info.ident); > if(entry == NULL) > return -1; > > if(entry->flags & FL_STRIP) > { > data = dupstrip(prodp->data, prodp->info.sz, &sz); > if(data == NULL) > return -1; > } > > status = stdio_put(entry, prodp->info.ident, data, sz); > if(data != prodp->data) > free(data); > if(status != -1) > status = atFinishedArgs(argc, argv, entry); > return status; > } > > /* End STDIO */ > > > /* Begin PIPE */ > static int > argcat(char *buf, int len, int argc, char **argv) > { > int cnt = 0; > char *cp; > > while(argc-- > 0 && (cp = *argv++) != NULL) > { > while(*cp != 0) > { > buf[cnt++] = *cp++; > if(cnt >= len) > break; > } > } > buf[cnt] = 0; > return cnt; > } > > > static int > argcat_cmp(fl_entry *entry, int argc, char **argv) > { > char buf[PATH_MAX+1]; > > assert(argc > 0); > assert(argv[0] != NULL); > assert(*argv[0] != 0); > > argcat(buf, sizeof(buf), argc, argv); > return(strcmp(buf, entry->path)); > } > > > /* > * Set to non-root privilege if possible. > * Do it in such a way that it is safe to fork. > * TODO: this is duplicated from ../server/priv.c > */ > void > endpriv(void) > { > const uid_t euid = geteuid(); > const uid_t uid = getuid(); > > /* if either euid or uid is unprivileged, use it */ > if(euid > 0) > setuid(euid); > else if(uid > 0) > setuid(uid); > > /* else warn??? or set to nobody??? */ > } > > > static int > pipe_open(fl_entry *entry, int argc, char **argv) > { > int ac = argc; > char **av = argv; > int pfd[2]; > pid_t pid; > > assert(argc >= 1); > assert(argv[0] != NULL && *argv[0] != 0); > > entry->handle.pbuf = NULL; > > entry->flags |= FL_NOTRANSIENT; > /* handle any options */ > for(; ac > 1 && *av[0] == '-'; ac-- , av++) > { > if( strncmp(*av,"-transient",3) == 0) > { > entry->flags &= ~FL_NOTRANSIENT; > } > else if( strncmp(*av,"-strip",3) == 0) > { > entry->flags |= FL_STRIP; > } > } > > if( pipe(pfd) == -1 ) > { > if(errno == EMFILE) > { > /* Too many open files */ > close_lru(0); > close_lru(0); > } > serror("pipe"); > return -1; > } > > pid = fork(); > if(pid == -1) > { /* failure */ > serror("pipe_open: fork"); > goto error_out; > } > /* else */ > > if(pid == 0) > { /* child */ > > (void)signal(SIGCHLD, SIG_DFL); > (void)signal(SIGTERM, SIG_DFL); > > if( dup2(pfd[0] , 0) == -1 ) > { > serror("pipe: child dup2"); > _exit(-1); > } > close_all(); > /* Set up fd 1, stdout */ > (void) close(1); > { > int fd = open("/dev/null", O_WRONLY); > if(fd >= 0 && fd != 1) > { > (void) dup2(fd, 1); > (void) close(fd); > } > } > /* we leave stderr alone */ > > endpriv(); > > assert(av[ac] == NULL); > (void) execvp(av[0], &av[0]); > serror("pipe: execvp: %s", av[0]); > _exit(127); > } > /* else, parent */ > > if(close(pfd[0]) == -1) > { > /* How can this ever happen? */ > serror("pipe: parent close"); > /* not fatal ? */ > } > > /* set up pfd[1] as output descriptor */ > entry->handle.pbuf = new_pbuf(pfd[1], 512); /* _POSIX_PIPE_BUF */ > if(entry->handle.pbuf == NULL) > goto error_out; > > entry->private = pid; > argcat(entry->path, PATH_MAX, argc, argv); > > udebug(" pipe_open: %d %d", pfd[1], pid); > > return pfd[1]; > > error_out: > (void)close(pfd[1]); > (void)close(pfd[0]); > return -1; > } > > > /* > * Used by reap() to delete a PIPE entry > */ > static void > tag_pid_entry(pid_t pid) > { > fl_entry *entry = NULL; > > if(pid == -1) > return; > > for(entry = thefl->tail; entry != NULL; > entry = entry->prev ) > { > if(entry->type == PIPE > && pid == entry->private) > break; > } > > if(entry != NULL) > { > udebug(" reap(%d): %s", pid, entry->path); > /* mark the entry as dead */ > entry->private = -1; > /* and delete it. Unsafe in interrupt context */ > delete_entry(entry); > } > else > { > udebug(" reap(%d): proc not on filel", pid); > } > } > > > int > reap(pid_t pid, int options) > { > pid_t wpid = 0; > int status = 0; > > #ifndef NO_WAITPID > wpid = waitpid(pid, &status, options); > #else > if(options == 0) > wpid = wait(&status); > /* customize here for older systems, use wait3 or whatever */ > #endif > if(wpid == -1) > { > if(!(errno == ECHILD && pid == -1)) /* Only complain when > relevant */ > serror("waitpid"); > return -1; > } > /* else */ > > if(wpid != 0) > { > > #if !defined(WIFSIGNALED) && !defined(WIFEXITED) > #error "Can't decode wait status" > #endif > > #if defined(WIFSTOPPED) > if(WIFSTOPPED(status)) > { > unotice("child %d stopped by signal %d", > wpid, WSTOPSIG(status)); > } > else > #endif > #if defined(WIFSIGNALED) > if(WIFSIGNALED(status)) > { > tag_pid_entry(wpid); > unotice("child %d terminated by signal %d", > wpid, WTERMSIG(status)); > } > else > #endif > #if defined(WIFEXITED) > if(WIFEXITED(status)) > { > tag_pid_entry(wpid); > if(WEXITSTATUS(status) != 0) > unotice("child %d exited with status %d", > wpid, WEXITSTATUS(status)); > } > #endif > } > > return wpid; > } > > > static int > pipe_sync(fl_entry *entry, int block) > { > int status = ENOERR; > udebug(" pipe_sync: %d %s", > entry->handle.pbuf ? entry->handle.pbuf->pfd : -1, > block ? "" : "non-block"); > status = pbuf_flush(entry->handle.pbuf, block, pipe_timeo); > if(status != ENOERR) > entry->flags &= ~FL_NEEDS_SYNC; > return status; > } > > > static void > pipe_close(fl_entry *entry) > { > pid_t pid = (pid_t)entry->private; > int pfd = -1; > > udebug(" pipe_close: %d, %d", > entry->handle.pbuf ? entry->handle.pbuf->pfd : -1, pid); > if(entry->handle.pbuf != NULL) > { > if(pid >= 0 && (entry->flags & FL_NEEDS_SYNC)) > { > (void) pipe_sync(entry, TRUE); > } > pfd = entry->handle.pbuf->pfd; > free_pbuf(entry->handle.pbuf); > } > if(pfd != -1) > { > if(close(pfd) == -1) > { > serror("pipe close: %s", entry->path); > } > /* > * The close should cause termination of the child > * as the child reads EOF. The child is wait()'ed > * upon asynchronous in a SIGCHLD handler. > */ > } > entry->path[0] = 0; > entry->handle.pbuf = NULL; > entry->private = 0; > } > > > /* > * N.B. New return convention: > * returns ENOERR (0) or, on failure, the errno. > */ > /*ARGSUSED*/ > static int > pipe_put(fl_entry *entry, const char *ignored, > const void *data, size_t sz) > { > int status = ENOERR; > > udebug(" pipe_put: %d", > entry->handle.pbuf ? entry->handle.pbuf->pfd : -1); > TO_HEAD(entry); > if(entry->handle.pbuf == NULL) > return EINVAL; > > status = pbuf_write(entry->handle.pbuf, > data, sz, pipe_timeo); > > if(status != ENOERR) > { > uerror("pipe_dbufput: %s write error", > entry->path); > /* don't waste time syncing an errored entry */ > entry->flags &= ~FL_NEEDS_SYNC; > delete_entry(entry); > return status; > } > entry->flags |= FL_NEEDS_SYNC; > return ENOERR; > } > > > static struct fl_ops pipe_ops = { > argcat_cmp, > pipe_open, > pipe_close, > pipe_sync, > pipe_put, > }; > > > /*ARGSUSED*/ > int > pipe_prodput(const product *prodp, int argc, char **argv, > const void *ignored, size_t also_ignored) > { > int status = 0; > void *data = prodp->data; > size_t sz = prodp->info.sz; > fl_entry *entry = get_fl_entry(PIPE, argc, argv); > > udebug(" pipe_prodput: %d %s", > (entry != NULL && entry->handle.pbuf) > ? entry->handle.pbuf->pfd : -1, > prodp->info.ident); > > if(entry == NULL) > return -1; > > if(entry->flags & FL_STRIP) > { > data = dupstrip(prodp->data, prodp->info.sz, &sz); > if(data == NULL) > return -1; > } > > status = pipe_put(entry, prodp->info.ident, data, sz); > if(status == EPIPE) > { > /* > * In case the decoder exited and we haven't yet reaped, > * try again once. > */ > uerror("pipe_prodput: trying again"); > entry = get_fl_entry(PIPE, argc, argv); > if(entry == NULL) > return -1; > status = pipe_put(entry, prodp->info.ident, data, sz); > } > if(data != prodp->data) > free(data); > > if(status != ENOERR) > return -1; > > return atFinishedArgs(argc, argv, entry);; > } > > > /*ARGSUSED*/ > int > spipe_prodput(const product *prod, int argc, char **argv, > const void *ignored, size_t also_ignored) > { > fl_entry *entry; > char *buffer; > size_t len; > unsigned long offset; > int status = ENOERR; > > typedef union { > unsigned long u_long; > char cu_long[sizeof(unsigned long)]; > } conv; > conv key_len; > conv data_len; > conv sync; > > > entry = get_fl_entry(PIPE, argc, argv); > udebug(" spipe_prodput: %d %s", > (entry != NULL && entry->handle.pbuf) > ? entry->handle.pbuf->pfd : -1, > prod->info.ident); > if(entry == NULL) > return -1; > > /* > **--------------------------------------------------------- > ** Place the following information into dbuf_val for > ** writing to the pipe: > ** > ** unsigned long SPIPE_SYNC > ** unsigned long key_len > ** char *key > ** unsigned long data_len (this includes ETX/RS makers) > ** char *data > ** char SPIPE_ETX > ** char SPIPE_RS > ** > ** First, get lengths of key and data to allocate space > ** in a temporary buffer. > ** > **--------------------------------------------------------- > */ > #ifndef SPIPE_SYNC > #define SPIPE_SYNC 0x1DFCCF1A > #endif /* !SPIPE_SYNC */ > > #ifndef SPIPE_ETX > #define SPIPE_ETX '\003' > #endif /* !SPIPE_ETX */ > > #ifndef SPIPE_RS > #define SPIPE_RS '\036' > #endif /* !SPIPE_ETX */ > > key_len.u_long = strlen(prod->info.ident); > data_len.u_long = prod->info.sz + 2; > sync.u_long = SPIPE_SYNC; > > len = (unsigned ) (sizeof(unsigned long) + > sizeof(key_len.cu_long) + strlen(prod->info.ident) + > sizeof(data_len.cu_long) + prod->info.sz + 2); > > buffer = calloc(1, len); > > /*--------------------------------------------------------- > ** Now place the individual items into the buffer > **-------------------------------------------------------*/ > > offset = 0; > > memcpy (buffer+offset, sync.cu_long, sizeof(sync.cu_long)); > offset = offset + sizeof(unsigned long); > > memcpy(buffer+offset, key_len.cu_long, sizeof(key_len.cu_long)); > offset = offset + sizeof(key_len); > > memcpy(buffer+offset, prod->info.ident, key_len.u_long); > offset = offset + key_len.u_long; > > memcpy(buffer+offset, data_len.cu_long, sizeof(data_len.cu_long)); > offset = offset + sizeof(data_len); > > memcpy(buffer+offset, prod->data, prod->info.sz); > > /*--------------------------------------------------------- > ** Terminate the message with ETX & RS > **-------------------------------------------------------*/ > buffer[len - 2] = SPIPE_ETX; > buffer[len - 1] = SPIPE_RS; > > uerror("spipe_prodput: size = %d\t%d %d %d", prod->info.sz, buffer[len > -3], > buffer[len -2], buffer[len -1]); > > /*--------------------------------------------------------- > ** Send this stuff and tidy up > **-------------------------------------------------------*/ > status = pipe_put(entry, prod->info.ident, buffer, len); > if(status == EPIPE) > { > /* > * In case the decoder exited and we haven't yet reaped, > * try again once. > */ > uerror("spipe_prodput: trying again"); > entry = get_fl_entry(PIPE, argc, argv); > if(entry == NULL) > return -1; > status = pipe_put(entry, prod->info.ident, buffer, len); > } > free(buffer); > if(status != ENOERR) > return -1; > > return atFinishedArgs(argc, argv, entry); > > } > > > int > xpipe_prodput(const product *prod, int argc, char **argv, > const void *xprod, size_t xlen) > { > int status = ENOERR; > fl_entry *entry; > > entry = get_fl_entry(PIPE, argc, argv); > udebug(" xpipe_prodput: %d %s", > (entry != NULL && entry->handle.pbuf) > ? entry->handle.pbuf->pfd : -1, prod->info.ident); > if(entry == NULL) > return -1; > > status = pipe_put(entry, prod->info.ident, xprod, xlen); > if(status == EPIPE) > { > /* > * In case the decoder exited and we haven't yet reaped, > * try again once. > */ > uerror("xpipe_prodput: trying again"); > entry = get_fl_entry(PIPE, argc, argv); > if(entry == NULL) > return -1; > status = pipe_put(entry, prod->info.ident, xprod, xlen); > } > > if(status != ENOERR) > return -1; > > return atFinishedArgs(argc, argv, entry); > } > /* End PIPE */ > > > #ifndef NO_DB > # ifdef USE_GDBM > /* namespace conflict with gdbm_open, etc, so using prefix ldmdb_ */ > > > /* > * called in gdbm when it tries to punt > * If we didn't provide this function, gdbm would print the > * message and call exit(-1). > */ > static void > ldmdb_fatal ( char * str) > { > serror("%s", str); > } > > > /* > * two or 3 args: > * pathname flag [dblocksize] > * if flag is 0 open read/write/create, otherwise open readonly > */ > static int > ldmdb_open(fl_entry *entry, int argc, char **argv) > { > char *path; > GDBM_FILE db; > long tmp = 0; > int read_write = GDBM_WRCREAT; > /* default: choose to optimize for space over time */ > #define DEFAULT_DBLOCKSIZE 512 > int dblocksize = DEFAULT_DBLOCKSIZE; > > entry->handle.db = NULL; > path = argv[0]; > read_write = atoi(argv[1]); > > if(argc > 2) > { > if ( (tmp = atoi(argv[2])) > 0 ) > { > dblocksize = (int)tmp; > } > else > { > uerror("%s: ldmdb_open: -dblocksize %s invalid", > path, argv[1] ); > } > } > > if(read_write != GDBM_READER) /* not read only */ > { > /* create directories if needed */ > if(diraccess(path, (R_OK | W_OK), !0) == -1) > { > serror("Couldn't access directories leading to %s", > path); > return -1; > } > } > > db = gdbm_open(path, dblocksize, read_write, 0664, ldmdb_fatal); > if(db == NULL) > { > if(errno == EMFILE) > { > /* Too many open files */ > close_lru(0); > close_lru(0); > } > serror("gdbm_open: %s", path); > return -1; > } > entry->handle.db = db; > entry->private = read_write; > strncpy(entry->path, path, PATH_MAX); > entry->path[PATH_MAX] = 0; /* just in case */ > udebug(" ldmdb_open: %s", entry->path); > return 0; > } > > > static void > ldmdb_close(fl_entry *entry) > { > udebug(" ldmdb_close: %s", entry->path); > if(entry->handle.db != NULL) > gdbm_close(entry->handle.db); > entry->private = 0; > entry->path[0] = 0; > entry->handle.db = NULL; > } > > > static int > ldmdb_cmp(fl_entry *entry, int argc, char **argv) > { > char *path; > int read_write; > int cmp; > > assert(argc > 1); > assert(argv[0] != NULL); > assert(*argv[0] != 0); > > path = argv[0]; > read_write = atoi(argv[1]); > > cmp = strcmp(path, entry->path); > if(cmp == 0) > { > if(read_write != GDBM_READER && > read_write != entry->private) > { > /* > * the flags don't match, so close and reopen > */ > ldmdb_close(entry); > if(ldmdb_open(entry, argc, argv) < 0) > cmp = -1; > } > } > return cmp; > } > > > /*ARGSUSED*/ > static int > ldmdb_sync(fl_entry *entry, int block) > { > /* there is no gdbm_sync */ > udebug(" ldmdb_sync: %s", > entry->handle.db ? entry->path : ""); > entry->flags &= ~FL_NEEDS_SYNC; > return(0); > } > > > /*ARGSUSED*/ > static int > ldmdb_put(fl_entry *entry, const char *keystr, > const void *data, size_t sz) > { > datum key, content; > int status; > > key.dptr = (char *) keystr /* N.B. cast away const */; > key.dsize = (int) strlen(key.dptr) + 1; /* include the \0 */ > > content.dptr = (char *) data; /* N.B. cast away const */ > content.dsize = (int)sz; > > #if defined(DB_CONCAT) && !DB_XPROD > /* concatenate duplicate keys */ > /* > * Code for concatenating data when the key is a duplicate. > * Contributed 9/17/91 JCaron/PNeilley/LCarson > * Wrecks idea of "product" when applied at this layer, so > * only define DB_CONCAT when DB_XPROD is not defined. > */ > > status = gdbm_store(entry->handle.db, key, content, GDBM_INSERT); > if (status == 1 ) > { > int size; > datum old_stuff, new_stuff; > old_stuff = gdbm_fetch(entry->handle.db, key); > udebug("\tConcatenating data under key %s", key.dptr); > if (NULL == old_stuff.dptr) > { > serror("ldmdb_prodput: Inconsistent Duplicate Key storage"); > return -1; > } > size = content.dsize+old_stuff.dsize; > if (NULL == (new_stuff.dptr = malloc(size))) > { > serror("ldmdb_prodput: malloc failed"); > free (old_stuff.dptr); > return -1; > } > memcpy(new_stuff.dptr, old_stuff.dptr, old_stuff.dsize); > memcpy(&new_stuff.dptr[old_stuff.dsize], content.dptr, > content.dsize); > new_stuff.dsize = size; > status = gdbm_store(entry->handle.db, key, new_stuff, > GDBM_REPLACE); > free (new_stuff.dptr); > free (old_stuff.dptr); > } > > #else > /* TODO: replace flag */ > status = gdbm_store(entry->handle.db, key, content, GDBM_REPLACE); > #endif > return status; > } > > # else /*USE_GDBM*/ > > /* > * two or 3 args: > * pathname flag [dblocksize] > * if flag is 0 open read/write/create, otherwise open readonly > */ > static int > ldmdb_open(fl_entry *entry, int ac, char **av) > { > const char *path; > int flags = (O_WRONLY|O_CREAT); > > assert(ac > 0); > assert(av[ac -1] != NULL); > assert(*av[ac -1] != 0); > > entry->handle.db = NULL; > > for(; ac > 1 && *av[0] == '-'; ac-- , av++) > { > if( strncmp(*av,"-overwrite",3) == 0) > { > flags |= O_TRUNC; > } > else if( strncmp(*av,"-strip",3) == 0) > { > entry->flags |= FL_STRIP; > } > } > > path = av[ac-1]; > > /* create directories if needed */ > if(diraccess(path, (R_OK | W_OK), !0) == -1) > { > serror("Couldn't access directories leading to %s", path); return -1; > } > > entry->handle.db = dbm_open(path, flags, 0666); > if(entry->handle.db == NULL) > { > if(errno == EMFILE) > { > /* Too many open files */ > close_lru(0); > close_lru(0); > close_lru(0); > close_lru(0); > } > serror("ldmdb_open: %s", path); > return -1; > } > strncpy(entry->path, path, PATH_MAX); > entry->path[PATH_MAX] = 0; /* just in case */ > udebug(" ldmdb_open: %s", entry->path); > return 0; > } > > > static void > ldmdb_close(fl_entry *entry) > { > udebug(" ldmdb_close: %s", entry->path); > if(entry->handle.db != NULL) > dbm_close(entry->handle.db); > entry->private = 0; > entry->path[0] = 0; > entry->handle.db = NULL; > } > > > static int > ldmdb_cmp(fl_entry *entry, int argc, char **argv) > { > return str_cmp(entry, argc, argv); > } > > > /*ARGSUSED*/ > static int > ldmdb_sync(fl_entry *entry, int block) > { > /* there is no dbm_sync */ > udebug(" ldmdb_sync: %s", > entry->handle.db ? entry->path : ""); > entry->flags &= ~FL_NEEDS_SYNC; > return(0); > } > > > /*ARGSUSED*/ > static int > ldmdb_put(fl_entry *entry, const char *keystr, > const void *data, size_t sz) > { > datum key, content; > int status; > > key.dptr = (char *) keystr /* N.B. cast away const */; > key.dsize = (int) strlen(key.dptr) + 1; /* include the \0 */ > > content.dptr = (char *) data; /* N.B. cast away const */ > content.dsize = (int)sz; > > #if defined(DB_CONCAT) && !DB_XPROD > /* concatenate duplicate keys */ > /* > * Code for concatenating data when the key is a duplicate. > * Contributed 9/17/91 JCaron/PNeilley/LCarson > * Wrecks idea of "product" when applied at this layer, so > * only define DB_CONCAT when DB_XPROD is not defined. > */ > > status = dbm_store(entry->handle.db, key, content, DBM_INSERT); > if (status == 1 ) > { > int size; > datum old_stuff, new_stuff; > old_stuff = dbm_fetch(entry->handle.db, key); > udebug("\tConcatenating data under key %s", key.dptr); > if (NULL == old_stuff.dptr) > { > serror("ldmdb_prodput: Inconsistent Duplicate Key storage"); > return -1; > } > size = content.dsize+old_stuff.dsize; > if (NULL == (new_stuff.dptr = malloc(size))) > { > serror("ldmdb_prodput: malloc failed"); > free (old_stuff.dptr); > return -1; > } > memcpy(new_stuff.dptr, old_stuff.dptr, old_stuff.dsize); > memcpy(&((char *)new_stuff.dptr)[old_stuff.dsize], > content.dptr, content.dsize); > new_stuff.dsize = size; > status = dbm_store(entry->handle.db, key, new_stuff, DBM_REPLACE); > free (new_stuff.dptr); > free (old_stuff.dptr); > } > > #else > /* TODO: replace flag */ > status = dbm_store(entry->handle.db, key, content, DBM_REPLACE); > #endif > return status; > } > # endif /*USE_GDBM*/ > > > static struct fl_ops ldmdb_ops = { > ldmdb_cmp, > ldmdb_open, > ldmdb_close, > ldmdb_sync, > ldmdb_put, > }; > > > /*ARGSUSED*/ > int > ldmdb_prodput(const product *prod, int ac, char **av, > const void *xp, size_t xlen) > { > fl_entry *entry; > int status; > int closeflag = 0; > > const char *keystr; > char *dblocksizep = NULL; > char *gdbm_wrcreat = "2"; > > for(; ac > 1 && *av[0] == '-'; ac-- , av++) > { > if( strncmp(*av,"-close",3) == 0) > closeflag = !0; > else if( strncmp(*av,"-dblocksize",3) == 0) > { > ac--; av++; > dblocksizep = *av; > } else > uerror("dbfile: Invalid argument %s", *av); > > } > > { > /* set up simple argc, argv for ldmdb_open */ > int argc = 0; > char *argv[4]; > argv[argc++] = av[0]; > argv[argc++] = gdbm_wrcreat; > if(dblocksizep != NULL) > argv[argc++] = dblocksizep; > argv[argc] = NULL; > entry = get_fl_entry(FT_DB, argc, argv); > udebug(" ldmdb_prodput: %s %s", > entry == NULL ? "" : entry->path, prod->info.ident); > if(entry == NULL) return -1; > } > > ac--; av++; > > if(ac >= 0 && av[0] != NULL && *av[0] != 0) > { > /* use command line arg as key */ > keystr = av[0]; > } > else > { > /* use product->ident */ > keystr = prod->info.ident; > } > > #if DB_XPROD > status = ldmdb_put(entry, keystr, xp, xlen); > #else > status = ldmdb_put(entry, keystr, prod->data, prod->info.sz); > #endif > > if(status == -1) > { > uerror("db_put: %s error for %s, dbkey %s", > entry->path, prod->info.ident, keystr); > } > if(closeflag || status == -1) > { > delete_entry(entry); > } > > return status; > } > > #endif /* !NO_DB */ > > > static fl_entry * > new_fl_entry(ft_t type, int argc, char **argv) > { > fl_entry *entry = NULL; > > entry = Alloc(1, fl_entry); > if(entry == NULL) > { > serror("new_fl_entry: malloc"); > return NULL; > } > entry->path[0] = 0; > > switch (type) { > case UNIXIO : > entry->ops = &unio_ops; > break; > case STDIO : > entry->ops = &stdio_ops; > break; > case PIPE : > entry->ops = &pipe_ops; > break; > case FT_DB : > #ifndef NO_DB > entry->ops = &ldmdb_ops; > #else > uerror("new_fl_entry: DB type not enabled"); > goto err; > /*NOTREACHED*/ > #endif /* !NO_DB */ > break; > default : > uerror("new_fl_entry: unknown type %d", type); > goto err; > } > > entry->flags = 0; > entry->type = type; > entry->next = NULL; > entry->prev = NULL; > entry->path[0] = 0; > entry->private = 0; > > if( entry->ops->open(entry, argc, argv) == -1 ) > goto err; > entry->serial = newSerial(); > > return entry; > err : > free_fl_entry(entry); > return NULL; > } > > --------------070609090501010602000202--