GNU/Linux >> Tutoriels Linux >  >> Linux

fifo Linux non bloquant (journalisation à la demande)

C'est un (très) vieux fil, mais j'ai rencontré un problème similaire récemment. En fait, ce dont j'avais besoin, c'est d'un clonage de stdin vers stdout avec une copie dans un tube non bloquant. la fte proposée dans la première réponse y a vraiment aidé, mais était (pour mon cas d'utilisation) trop volatile. Cela signifie que j'ai perdu des données que j'aurais pu traiter si j'y étais parvenu à temps.

Le scénario auquel j'étais confronté est que j'ai un processus (some_process) qui agrège certaines données et écrit ses résultats toutes les trois secondes sur stdout. La configuration (simplifiée) ressemblait à ceci (dans la configuration réelle, j'utilise un canal nommé):

some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz

Maintenant, raw_data.gz doit être compressé et doit être complet. ftee fait très bien ce travail. Mais le tuyau que j'utilise au milieu était trop lent pour saisir les données évacuées - mais il était assez rapide pour tout traiter s'il pouvait y accéder, ce qui a été testé avec un té normal. Cependant, un té normal bloque si quelque chose arrive au tuyau sans nom, et comme je veux pouvoir me connecter à la demande, le té n'est pas une option. Retour au sujet :Cela s'est amélioré lorsque j'ai mis un tampon entre les deux, ce qui a donné :

some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz

Mais cela perdait encore des données que j'aurais pu traiter. Je suis donc allé de l'avant et j'ai étendu le ftee proposé auparavant à une version tamponnée (bftee). Il a toujours les mêmes propriétés, mais utilise un tampon interne (inefficace ?) en cas d'échec d'une écriture. Il perd toujours des données si le tampon est plein, mais cela fonctionne parfaitement dans mon cas. Comme toujours, il y a beaucoup de place à l'amélioration, mais comme j'ai copié le code ici, j'aimerais le partager avec les personnes qui pourraient en avoir besoin.

/* bftee - clone stdin to stdout and to a buffered, non-blocking pipe 
    (c) [email protected]
    (c) [email protected]
    WTFPL Licence */

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    #include <errno.h>
    #include <signal.h>
    #include <unistd.h>

    // the number of sBuffers that are being held at a maximum
    #define BUFFER_SIZE 4096
    #define BLOCK_SIZE 2048

    typedef struct {
      char data[BLOCK_SIZE];
      int bytes;
    } sBuffer;

    typedef struct {
      sBuffer *data;  //array of buffers
      int bufferSize; // number of buffer in data
      int start;      // index of the current start buffer
      int end;        // index of the current end buffer
      int active;     // number of active buffer (currently in use)
      int maxUse;     // maximum number of buffers ever used
      int drops;      // number of discarded buffer due to overflow
      int sWrites;    // number of buffer written to stdout
      int pWrites;    // number of buffers written to pipe
    } sQueue;

    void InitQueue(sQueue*, int);              // initialized the Queue
    void PushToQueue(sQueue*, sBuffer*, int);  // pushes a buffer into Queue at the end 
    sBuffer *RetrieveFromQueue(sQueue*);       // returns the first entry of the buffer and removes it or NULL is buffer is empty
    sBuffer *PeakAtQueue(sQueue*);             // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer
    void ShrinkInQueue(sQueue *queue, int);    // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty
    void DelFromQueue(sQueue *queue);          // removes the first entry of the queue

    static void sigUSR1(int);                  // signal handled for SUGUSR1 - used for stats output to stderr
    static void sigINT(int);                   // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ?

    sQueue queue;                              // Buffer storing the overflow
    volatile int quit;                         // for quiting the main loop

    int main(int argc, char *argv[])
    {   
        int readfd, writefd;
        struct stat status;
        char *fifonam;
        sBuffer buffer;
        ssize_t bytes;
        int bufferSize = BUFFER_SIZE;

        signal(SIGPIPE, SIG_IGN);
        signal(SIGUSR1, sigUSR1);
        signal(SIGTERM, sigINT);
        signal(SIGINT,  sigINT);

        /** Handle commandline args and open the pipe for non blocking writing **/

        if(argc < 2 || argc > 3)
        {   
            printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n"
                   "FIFO - path to a named pipe, required argument\n"
                   "BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]);
            exit(EXIT_FAILURE);
        }

        fifonam = argv[1];
        if (argc == 3) {
          bufferSize = atoi(argv[2]);
          if (bufferSize == 0) bufferSize = BUFFER_SIZE;
        }

        readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
        if(-1==readfd)
        {   
            perror("bftee: readfd: open()");
            exit(EXIT_FAILURE);
        }

        if(-1==fstat(readfd, &status))
        {
            perror("bftee: fstat");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        if(!S_ISFIFO(status.st_mode))
        {
            printf("bftee: %s in not a fifo!\n", fifonam);
            close(readfd);
            exit(EXIT_FAILURE);
        }

        writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
        if(-1==writefd)
        {
            perror("bftee: writefd: open()");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        close(readfd);


        InitQueue(&queue, bufferSize);
        quit = 0;

        while(!quit)
        {
            // read from STDIN
            bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data));

            // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading
            if (bytes < 0 && errno == EINTR) continue;
            if (bytes <= 0) break;

            // save the number if read bytes in the current buffer to be processed
            buffer.bytes = bytes;

            // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux
            // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less.
            bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes);
            queue.sWrites++;

            if(-1==bytes) {
                perror("ftee: writing to stdout");
                break;
            }

            sBuffer *tmpBuffer = NULL;

            // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write
            // the buffered data to the pipe. This continues until the Buffer is empty or the write fails.
            // NOTE: bytes cannot be -1  (that would have failed just before) when the loop is entered. 
            while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) {
               // write the oldest buffer to the pipe
               bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);

               // the  written bytes are equal to the buffer size, the write is successful - remove the buffer and continue
               if (bytes == tmpBuffer->bytes) {
                 DelFromQueue(&queue);
                 queue.pWrites++;
               } else if (bytes > 0) {
                 // on a positive bytes value there was a partial write. we shrink the current buffer
                 //  and handle this as a write failure
                 ShrinkInQueue(&queue, bytes);
                 bytes = -1;
               }
            }
            // There are several cases here:
            // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe
            // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data
            // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped.
            if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes);

            // again, there are several cases what can happen here
            // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens
            // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue
            // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue
            if (bytes != buffer.bytes)
              PushToQueue(&queue, &buffer, bytes);
            else 
              queue.pWrites++;
        }

        // once we are done with STDIN, try to flush the buffer to the named pipe
        if (queue.active > 0) {
           //set output buffer to block - here we wait until we can write everything to the named pipe
           // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. 
           int saved_flags = fcntl(writefd, F_GETFL);
           int new_flags = saved_flags & ~O_NONBLOCK;
           int res = fcntl(writefd, F_SETFL, new_flags);

           sBuffer *tmpBuffer = NULL;
           //TODO: this does not handle partial writes yet
           while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) {
             int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
             if (bytes != -1) DelFromQueue(&queue);
           }
        }

        close(writefd);

    }


    /** init a given Queue **/
    void InitQueue (sQueue *queue, int bufferSize) {
      queue->data = calloc(bufferSize, sizeof(sBuffer));
      queue->bufferSize = bufferSize;
      queue->start = 0;
      queue->end = 0;
      queue->active = 0;
      queue->maxUse = 0;
      queue->drops = 0;
      queue->sWrites = 0;
      queue->pWrites = 0;
    }

    /** push a buffer into the Queue**/
    void PushToQueue(sQueue *queue, sBuffer *p, int offset)
    {

        if (offset < 0) offset = 0;      // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead
        if (offset == p->bytes) return;  // in this case there are 0 bytes to add to the queue. Nothing to write

        // this should never happen - offset cannot be bigger than the buffer itself. Panic action
        if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);}

        // debug output on a partial write. TODO: remove this line
        // if (offset > 0 ) fprintf(stderr, "partial write to buffer\n");

        // copy the data from the buffer into the queue and remember its size
        memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset);
        queue->data[queue->end].bytes = p->bytes - offset;

        // move the buffer forward
        queue->end = (queue->end + 1) % queue->bufferSize;

        // there is still space in the buffer
        if (queue->active < queue->bufferSize)
        {
            queue->active++;
            if (queue->active > queue->maxUse) queue->maxUse = queue->active;
        } else {
            // Overwriting the oldest. Move start to next-oldest
            queue->start = (queue->start + 1) % queue->bufferSize;
            queue->drops++;
        }
    }

    /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/
    sBuffer *RetrieveFromQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }

        queue->start = (queue->start + 1) % queue->bufferSize;
        queue->active--;
        return &(queue->data[queue->start]);
    }

    /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/
    sBuffer *PeakAtQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }
        return &(queue->data[queue->start]);
    }

    /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/
    void ShrinkInQueue(sQueue *queue, int bytes) {

      // cannot remove negative amount of bytes - this is an error case. Ignore it
      if (bytes <= 0) return;

      // remove the entry if the offset is equal to the buffer size
      if (queue->data[queue->start].bytes == bytes) {
        DelFromQueue(queue);
        return;
      };

      // this is a partial delete
      if (queue->data[queue->start].bytes > bytes) {
        //shift the memory by the offset
        memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes);
        queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes;
        return;
      }

      // panic is the are to remove more than we have the buffer
      if (queue->data[queue->start].bytes < bytes) {
        perror("we wrote more than we had - this should never happen\n");
        exit(EXIT_FAILURE);
        return;
      }
    }

    /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/
    void DelFromQueue(sQueue *queue)
    {
        if (queue->active > 0) {
          queue->start = (queue->start + 1) % queue->bufferSize;
          queue->active--;
        }
    }

    /** Stats output on SIGUSR1 **/
    static void sigUSR1(int signo) {
      fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops);
    }

    /** handle signal for terminating **/
    static void sigINT(int signo) {
      quit++;
      if (quit > 1) exit(EXIT_FAILURE);
    }

Cette version prend un argument supplémentaire (facultatif) qui spécifie le nombre de blocs à mettre en mémoire tampon pour le tube. Mon exemple d'appel ressemble maintenant à ceci :

some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz

résultant en 16384 blocs à mettre en mémoire tampon avant que les rejets ne se produisent. cela utilise environ 32 Mo de mémoire en plus, mais... qui s'en soucie ?

Bien sûr, dans l'environnement réel, j'utilise un canal nommé afin que je puisse attacher et détacher au besoin. Il ressemble à ceci :

mkfifo named_pipe
some_process | bftee named_pipe 16384 | gzip > raw_data.gz &
cat named_pipe | onlineAnalysis.pl > results

De plus, le processus réagit aux signaux comme suit :SIGUSR1 -> affiche les compteurs sur STDERRSIGTERM, SIGINT -> le premier quitte la boucle principale et vide le tampon dans le tube, le second termine le programme immédiatement.

Peut-être que cela aidera quelqu'un à l'avenir... Profitez


Inspiré par votre question, j'ai écrit un programme simple qui vous permettra de faire ceci :

$ myprogram 2>&1 | ftee /tmp/mylog

Il se comporte de la même manière que tee mais clone le stdin vers stdout et vers un canal nommé (une exigence pour l'instant) sans bloquer. Cela signifie que si vous voulez vous connecter de cette façon, il se peut que vous perdiez vos données de journal, mais je suppose que c'est acceptable dans votre scénario.L'astuce consiste à bloquer SIGPIPE signal et d'ignorer l'erreur lors de l'écriture dans un fifo cassé. Cet échantillon peut être optimisé de différentes manières, bien sûr, mais jusqu'à présent, il fait le travail, je suppose.

/* ftee - clone stdin to stdout and to a named pipe 
(c) [email protected]
WTFPL Licence */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>

int main(int argc, char *argv[])
{
    int readfd, writefd;
    struct stat status;
    char *fifonam;
    char buffer[BUFSIZ];
    ssize_t bytes;
    
    signal(SIGPIPE, SIG_IGN);

    if(2!=argc)
    {
        printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a"
            " named pipe, required argument\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    fifonam = argv[1];

    readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
    if(-1==readfd)
    {
        perror("ftee: readfd: open()");
        exit(EXIT_FAILURE);
    }

    if(-1==fstat(readfd, &status))
    {
        perror("ftee: fstat");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    if(!S_ISFIFO(status.st_mode))
    {
        printf("ftee: %s in not a fifo!\n", fifonam);
        close(readfd);
        exit(EXIT_FAILURE);
    }

    writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
    if(-1==writefd)
    {
        perror("ftee: writefd: open()");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    close(readfd);

    while(1)
    {
        bytes = read(STDIN_FILENO, buffer, sizeof(buffer));
        if (bytes < 0 && errno == EINTR)
            continue;
        if (bytes <= 0)
            break;

        bytes = write(STDOUT_FILENO, buffer, bytes);
        if(-1==bytes)
            perror("ftee: writing to stdout");
        bytes = write(writefd, buffer, bytes);
        if(-1==bytes);//Ignoring the errors
    }
    close(writefd); 
    return(0);
}

Vous pouvez le compiler avec cette commande standard :

$ gcc ftee.c -o ftee

Vous pouvez le vérifier rapidement en exécutant, par exemple :

$ ping www.google.com | ftee /tmp/mylog

$ cat /tmp/mylog

Notez également - ce n'est pas un multiplexeur. Vous ne pouvez avoir qu'un seul processus faisant $ cat /tmp/mylog à la fois.


Cela ressemble à bash <> l'opérateur de redirection (3.6.10 Ouverture des descripteurs de fichier pour la lecture et l'écritureVoir) rend l'écriture dans le fichier/fifo ouvert avec lui non bloquant. Cela devrait fonctionner :

$ mkfifo /tmp/mylog
$ exec 4<>/tmp/mylog
$ myprogram 2>&1 | tee >&4
$ cat /tmp/mylog # on demend

Solution donnée par gniourf_gniourf sur le canal IRC #bash.


Cependant, cela créerait un fichier journal de plus en plus important même s'il n'est pas utilisé jusqu'à ce que le lecteur manque d'espace.

Pourquoi ne pas faire pivoter périodiquement les journaux ? Il y a même un programme pour le faire pour vous logrotate .

Il existe également un système pour générer des messages de journal et faire différentes choses avec eux selon le type. Il s'appelle syslog .

Vous pourriez même combiner les deux. Demandez à votre programme de générer des messages syslog, configurez syslog pour les placer dans un fichier et utilisez logrotate pour vous assurer qu'ils ne remplissent pas le disque.

S'il s'avère que vous écrivez pour un petit système embarqué et que la sortie du programme est lourde, vous pouvez envisager diverses techniques.

  • Syslog distant :envoyez les messages syslog à un serveur syslog sur le réseau.
  • Utilisez les niveaux de gravité disponibles dans syslog pour faire différentes choses avec les messages. Par exemple. rejeter "INFO" mais enregistrer et transférer "ERR" ou supérieur. Par exemple. consoler
  • Utilisez un gestionnaire de signaux dans votre programme pour relire la configuration sur HUP et faire varier la génération de journaux "à la demande" de cette façon.
  • Faites écouter votre programme sur un socket Unix et écrivez-y des messages lorsqu'il est ouvert. Vous pourriez même implémenter une console interactive dans votre programme de cette façon.
  • À l'aide d'un fichier de configuration, fournissez un contrôle précis de la sortie de journalisation.

Linux
  1. Linux - Comprendre la connexion Linux ?

  2. Linux - Configurer le gouverneur du processeur sur demande ou conservateur ?

  3. Vérification des référentiels Linux et de la journalisation

  4. Vérifiez et imprimez qui se connecte sous Linux / Unix

  5. Comment mmapper un tampon du noyau Linux à l'espace utilisateur?

Comment démarrer automatiquement une session d'écran sous Linux lors de la connexion

Guide complet de journalisation Linux

Gestion de la mémoire Linux - Mémoire virtuelle et pagination à la demande

C++ Récupère la chaîne du Presse-papiers sous Linux

Recharger les affectations de groupe d'un utilisateur Linux sans se déconnecter

Définir la capacité du tuyau sous Linux