. .

C/C++: Zugriff mehrerer Prozesse auf Shared Memory unter Berücksichtigung des 3rd readers/writers Problems


Im folgenden Listing gehe ich auf den Zugriff auf Shared Memory ein. Dabei soll sichergestellt werden, dass zwei Prozesse, die auf den gemeinsamen Shared Memory zugreifen wollen, sich nicht gegenseitig am Zugriff hindern, die Prozesse also quasi „verhungern“.
Siehe hierzu auch zum Beispiel Wikipedia bzw. etwas genauer auf rfc1149.net.

Der folgende Teil bezieht sich auf die Semaphoren und den Shared Memory an für sich:
Die Header-Datei shmem.h:

#ifndef PRJ_SHMEM_H
#define PRJ_SHMEM_H 1

#include <prjconfig.h>

#define SEMKEYPATH "/opt/prj/sbin/prjd" /* Path used on ftok for semget key  */
#define SEMKEYID 1209                   /* Id used on ftok for semget key    */
#define SHMKEYPATH "/opt/prj/sbin/prjd" /* Path used on ftok for shmget key  */
#define SHMKEYID 1980                   /* Id used on ftok for shmget key    */

#define NUMSEMS 1                       /* Num of sems in created sem set    */
#define SHMEMSIZE 1024                  /* Size of shared memory             */

#if defined(__cplusplus)
extern "C" {
#endif

#ifndef DARWIN
union semun {
    int val; /* Value for SETVAL */
    struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
    unsigned short *array; /* Array for GETALL, SETALL */
    struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */
};
#endif

int
prj_shm_create(size_t sz, char const *fileName, int keyid);

int
prj_sem_create_all(int *semids, char const *fileName, int keyid);

void
prj_sem_destroy(int semid);

void
prj_sem_destroy_all(int *semids);

void
prj_shm_destroy(int shmid);

void
prj_shm_attach(int shmid, void **shm_address);

void
prj_shm_detach(void **shm_address);

int
prj_sem_block_shm_write(int semid);

int
prj_sem_unblock_shm_write(int semid);

void
prj_sem_set_reader(int semid, int reader);

int
prj_sem_get_reader(int semid);

void /* P */
prj_sem_lock(int semid);

void /* V */
prj_sem_unlock(int semid);

#if defined(__cplusplus)
}
#endif

#endif /* !PRJ_SHMEM_H */

Und der Code selbst (shmem.c):
Die Funktionen zum Loggen und Debuggen müssen natürlich angepasst werden.

#include <prjconfig.h>

#define _XOPEN_SOURCE

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <signal.h>

#undef _XOPEN_SOURCE

#include "logger.h"
#include "shmem.h"

/*
 * \brief   prj_sem_create
 *
 * Tries to bound to an existing semaphore.
 * If this fails a new semaphore will be created.
 *
 * \param   number          incremental identifier for one of the tree semaphores
 * \param   fileName        normally path of application which wants to use this semaphore
 * \param   keyid           selfmade identifier
 * 
 * \return                  on success internal semaphore id, otherwise -1
 */
int
prj_sem_create(int number, char const *fileName, int keyid) {
    int rc, semid;
    key_t semkey;
    union semun mysem;

    semkey = ftok(fileName, keyid + number);
    if (semkey == (key_t) -1) {
        prjlog("warn", "shmem", "ftok() for sem failed: %s (%d)", ERR, (int) errno);
        return -1; 
    }   

    semid = semget(semkey, NUMSEMS, 0600);
    if (semid == -1) {
        prjlog("info", "shmem", "semget() failed: %s (%d) (no create)", ERR, (int) errno);

        semid = semget(semkey, NUMSEMS, 0600 | IPC_CREAT);
        if (semid == -1) {
            prjlog("warn", "shmem", "semget() failed: %s (%d)", ERR, (int) errno);
            return -1; 
        } else {
            mysem.val = 1;

            rc = semctl(semid, 0, SETVAL, mysem);
            if (rc == -1) {
                prjlog("warn", "shmem", "semctl() failed: %s (%d) - destroying semaphore", ERR, (int) errno);

                prj_sem_destroy(semid);
                return -1; 
            }   
        }   
    }   

    prjlog("debug", "shmem", "bound to semaphore %d", (int) number);

    return semid;
}

/*
 * \brief   prj_sem_create_all
 *
 * Creates three semaphores needed to avoid 3rd readers/writers problem
 *
 * \param   semids          container which will hold the three semaphore identifier
 * \param   fileName        normally path of application which wants to use this semaphore
 * \param   keyid           selfmade identifier
 * 
 * \return                  on success filled semaphore container, otherwise -1
 */
int
prj_sem_create_all(int *semids, char const *fileName, int keyid) {
    int y;

    /*  
     * 0 id_mutex
     * 1 id_wri
     * 2 id_rc
     */

    for (y = 0; y < 3; y++) {
        semids[y] = prj_sem_create(y, fileName, keyid);

        if (semids[y] == -1) 
            return -1; 
    }   

    return 0;
}

/*
 * \brief   prj_shm_create
 *
 * Tries to bound to an existing shared memory segment.
 * If this fails a new shared memory segment will be created.
 *
 * \param   sz              size of shared memory segment to create
 * \param   fileName        normally path of application which wants to use this shared memory segment
 * \param   keyid           selfmade identifier
 * 
 * \return                  on success internal shared memory id, otherwise -1
 */
int
prj_shm_create(size_t sz, char const *fileName, int keyid) {
    int shmid;
    key_t shmkey;

    shmkey = ftok(fileName, keyid);

    if (shmkey == (key_t) -1) {
        prjlog("warn", "shmem", "ftok() for shm failed: %s (%d)", ERR, (int) errno);
        return -1;
    }

    shmid = shmget(shmkey, sz, 0600 | IPC_CREAT);
    if (shmid == -1) {
        prjlog("warn", "shmem", "shmget() failed: %s (%d)", ERR, (int) errno);
        return -1;
    }

    prjlog("debug", "shmem", "bound to shared memory");

    return shmid;
}

/*
 * \brief   prj_sem_destroy
 *
 * Destroys semaphore identified by `semid'.
 *
 * \param   semid           semaphore id got by prj_sem_create()
 * 
 * \return                  void()
 */
void
prj_sem_destroy(int semid) {
    if (semctl(semid, 1, IPC_RMID) == -1) {
        prjlog("warn", "shmem", "semctl() remove id failed: %s (%d)", ERR, (int) errno);
    } else {
        prjlog("debug", "shmem", "semaphores destroyed");
    }
}

/*
 * \brief   prj_sem_destroy_all
 *
 * Destroys all three semaphores created by prj_sem_create_all()
 *
 * \param   semids          container filled with semaphore ids
 * 
 * \return                  void()
 */
void
prj_sem_destroy_all(int *semids) {
    int y;

    for (y = 0; y < 3; y++) {
        prj_sem_destroy(semids[y]);
    }
}

/*
 * \brief   prj_shm_destroy
 *
 * Destroys shared memory segment identified by `shmid'
 *
 * \param   shmid           shared memory id got by prj_shm_create()
 * 
 * \return                  void()
 */
void
prj_shm_destroy(int shmid) {
    struct shmid_ds shmid_struct;

    if (shmctl(shmid, IPC_RMID, &shmid_struct) == -1) {
        prjlog("warn", "shmem", "shmctl() remove id failed: %s (%d)", ERR, (int) errno);
    } else {
        prjlog("debug", "shmem", "shared memory destroyed");
    }
}

/*
 * \brief   prj_shm_attach
 *
 * Attach to a shared memory segment. Pointer of address is stored in `shm_address'.
 *
 * \param   shmid           shared memory id got by prj_shm_create()
 * \param   shm_address     Pointer of pointer to address of shared memory segment
 * 
 * \return                  void()
 */
void
prj_shm_attach(int shmid, void **shm_address) {
    *shm_address = shmat(shmid, NULL, 0);

    if (*shm_address == NULL) {
        prjlog("warn", "shmem", "Could not attach shared memory: %s (%d)", ERR, (int) errno);
    } else
        prjlog("debug", "shmem", "attached shared memory");
}

/*
 * \brief   prj_shm_detach
 *
 * Detach a shared memory segment.
 *
 * \param   shm_address     Pointer of pointer to address of shared memory segment
 * 
 * \return                  void()
 */
void
prj_shm_detach(void **shm_address) {
    if (shmdt(*shm_address) == -1) {
        prjlog("warn", "shmem", "Could not detach shared memory: %s (%d)", ERR, (int) errno);
    } else
        prjlog("debug", "shmem", "detached shared memory");
}

/*
 * \brief   prj_sem_set_reader
 *
 * Sets a counter of corresponding readers.
 *
 * \param   semid           identifier of semaphore which is responsible for count of readers
 * \param   readers         number of current reader
 * 
 * \return                  void()
 */
void
prj_sem_set_reader(int semid, int reader) {
    union semun rsemun;
    
    rsemun.val = reader;
    semctl(semid, 0, SETVAL, rsemun);

}

/*
 * \brief   prj_sem_get_reader
 *
 * returns number of current readers
 *
 * \param   semid           identifier of semaphore which is responsible for count of readers
 * 
 * \return                  void()
 */
int
prj_sem_get_reader(int semid) {

    return semctl(semid, 0, GETVAL);
}

/*
 * \brief   prj_sem_lock
 *
 * locks access to a semaphore
 *
 * \param   semid           identifier of semaphore
 * 
 * \return                  void()
 */
void /* P */
prj_sem_lock(int semid) {
    struct sembuf mysemop;

    mysemop.sem_num = 0;
    mysemop.sem_op = -1;
    mysemop.sem_flg = 0;

    if (semop(semid, &mysemop, (size_t) 1) == -1) {
        uid_t uid = geteuid();
        if (uid != 0) {
            prjlog("critical", "shmem", "semop (lock) failed - committing suicide (%ld)", (long) getpid());
            kill(getpid(), SIGTERM);
            kill(getpid(), SIGKILL);
        }
    }
}

/*
 * \brief   prj_sem_unlock
 *
 * unlocks access to a semaphore
 *
 * \param   semid           identifier of semaphore
 * 
 * \return                  void()
 */
void /* V */
prj_sem_unlock(int semid) {
    struct sembuf mysemop;

    mysemop.sem_num = 0;
    mysemop.sem_op = 1;
    mysemop.sem_flg = 0;

    if (semop(semid, &mysemop, (size_t) 1) == -1) {
        uid_t uid = geteuid();
        if (uid != 0) {
            prjlog("critical", "shmem", "semop (lock) failed - committing suicide (%ld)", (long) getpid());
            kill(getpid(), SIGTERM);
            kill(getpid(), SIGKILL);
        }
    }
}

Im folgenden beispielhaften Listing werden sodann die oben zur Verfügung gestellten Funktionen verwendet.
Das Ganze hierbei, um nicht verfügbare Rechnersysteme in einer Blacklist zu verwalten, die von mehreren Child-Prozessen abgerufen wird (diesmal ein bisschen C++).
Einige eigene Funktionen, die hier verwendet werden aber nicht zur Verfügung stehen, ändern am Prinzip der Funktionsweise des Codes nichts.
Erst einmal die Definitionen in blacklist.hh:

#ifndef PRJ_BLACKLIST_HH
#define PRJ_BLACKLIST_HH 1

#include <string>
#include <vector>
#include <iostream>

#define BLACKLIST_DELIMITER "#"

std::vector <std::string>
prj_getBlacklist(int *semids, int shmid);

std::vector <std::string>
prj_getWhitelist(std::vector<std::string>& blacklist, std::vector<std::string>& fulllist);

void
prj_resetBlacklist(int *semids, int shmid);

int
prj_blacklistServer(std::string dead_server, int *semids, int shmid);

#endif

Und dann der Code in blacklist.cc:

#include <prjconfig.h>

#include <string>
#include <vector>
#include <iostream>
#include <sstream>
#include <algorithm>
#include <iterator>
#include <set>

#include "blacklist.hh"
#include "split.hh"
#include "shmem.h"
#include "debug.h"
#include "logger.h"

/*
 * \brief   prj_getBlacklist
 *
 * Returns a vector of Strings which contains current unavailable servers.
 *
 * \param   semids          container of semaphores to avoid 3rd readers/writers problem
 * \param   shmid           identifier of shared memory segment holding current blacklist
 * 
 * \return                  vector of strings containing blacklisted server
 */
std::vector <std::string>
prj_getBlacklist(int *semids, int shmid) {
#ifdef _DEBUG
    PDEBUG_START
#endif

    int reader;
    void *shm_address;
    std::stringstream tmp;
    std::string blacklist;
    std::vector<std::string> blacklisted;

    prj_sem_lock(semids[0]);

    reader = prj_sem_get_reader(semids[2]);
    if (reader == -1) {
#ifdef _DEBUG
        PDEBUG_FAILED
#endif
        return blacklisted;
    }   
    reader += 1;
    prj_sem_set_reader(semids[2], reader);

    if (reader == 1) {
        prj_sem_lock(semids[1]);
    }   

    prj_sem_unlock(semids[0]);

    /* **************************************************************************************** */

    prj_shm_attach(shmid, &shm_address);

    if (shm_address != NULL) {
        tmp << (char *) shm_address;
        blacklist = tmp.str();
#ifdef _DEBUG
        prjdbg("[%s:%d] [- %s] Current readable blacklist: '%s'.", __FILE__, __LINE__, __FUNCTION__, blacklist.c_str());
#endif
        prjlog("debug", "ad", "current blacklist: %s", blacklist.c_str());

        StringExplode(blacklist, BLACKLIST_DELIMITER, &blacklisted);

        prj_shm_detach(&shm_address);
    }   

#ifdef _DEBUG
    else
        prjdbg("[%s:%d] [- %s] Current blacklist is not readable.", __FILE__, __LINE__, __FUNCTION__);
#endif

    /* **************************************************************************************** */

    prj_sem_lock(semids[0]);

    reader = prj_sem_get_reader(semids[2]);
    if (reader == -1) {
#ifdef _DEBUG
        PDEBUG_FAILED
#endif
        return blacklisted;
    }

    reader -= 1;
    prj_sem_set_reader(semids[2], reader);

    if (reader == 0) {
        prj_sem_unlock(semids[1]);
    }

    prj_sem_unlock(semids[0]);

#ifdef _DEBUG
    PDEBUG_END
#endif
    return blacklisted;
}

/*
 * \brief   prj_getWhitelist
 *
 * Returns a vector of Strings which contains current available servers.
 *
 * \param   blacklist       vector of strings of current blacklisted server
 * \param   fulllist        vector of strings of all configured server
 * 
 * \return                  vector of strings containing available server
 */
std::vector <std::string>
prj_getWhitelist(std::vector<std::string>& blacklist, std::vector<std::string>& fulllist) {
#ifdef _DEBUG
    PDEBUG_START
#endif
    std::vector <std::string> whitelist;
    size_t i, j;
    bool found = false;

    for (i = 0; i < fulllist.size(); ++i) {
#ifdef _DEBUG       
        prjdbg("[%s:%d] [- %s] Fulllist: '%s'.", __FILE__, __LINE__, __FUNCTION__, fulllist[i].c_str());
#endif

        for (j = 0; j < blacklist.size(); j++) {
#ifdef _DEBUG       
            prjdbg("[%s:%d] [- %s] Blacklist: '%s'.", __FILE__, __LINE__, __FUNCTION__, blacklist[j].c_str());
#endif
            if (!strcmp(blacklist[j].c_str(), fulllist[i].c_str()))
                found = true;
        }

        if (found != true)
            whitelist.push_back(fulllist[i]);

        found = false;
    }

#ifdef _DEBUG
    PDEBUG_END
#endif
    return whitelist;
}

/*
 * \brief   prj_resetBlacklist
 *
 * resets current blacklist stored in shared memory
 *
 * \param   semids          container of semaphores to avoid 3rd readers/writers problem
 * \param   shmid           identifier of shared memory segment holding current blacklist
 * 
 * \return                  void()
 */
void
prj_resetBlacklist(int *semids, int shmid) {
#ifdef _DEBUG
    PDEBUG_START
#endif

    void *shm_address;

    prj_sem_lock(semids[1]);
    prj_shm_attach(shmid, &shm_address);

    if (shm_address != NULL) {
        strcpy((char *) shm_address, "#");
        prj_shm_detach(&shm_address);
    }

    prj_sem_unlock(semids[1]);

#ifdef _DEBUG
    PDEBUG_END
#endif
}

/*
 * \brief   prj_blacklistServer
 *
 * adds an unavailable server to current blacklist stored in shared memory
 *
 * \param   dead_server     string of name of server which is actually down
 * \param   semids          container of semaphores to avoid 3rd readers/writers problem
 * \param   shmid           identifier of shared memory segment holding current blacklist
 * 
 * \return                  returns always zero
 */
int
prj_blacklistServer(std::string dead_server, int *semids, int shmid) {
#ifdef _DEBUG
    prjdbg("[%s:%d] [< %s] Blacklisting server '%s'.", __FILE__, __LINE__, __FUNCTION__, dead_server.c_str());
#endif

    void *shm_address;
    std::stringstream copy;
    std::string newlist;

    prj_sem_lock(semids[1]);

    prj_shm_attach(shmid, &shm_address);

    if (shm_address != NULL) {
#ifdef _DEBUG
        prjdbg("[%s:%d] [- %s] Old blacklist is '%s'.", __FILE__, __LINE__, __FUNCTION__, (char *) shm_address);
#endif
        copy << (char *) shm_address;
        copy << dead_server << "#";
#ifdef _DEBUG
        prjdbg("[%s:%d] [- %s] New generated blacklist is '%s'.", __FILE__, __LINE__, __FUNCTION__, copy.str().c_str());
#endif
        /* long way around for HP-UX only, i'm not happy about this */
        newlist = copy.str();
        strcpy((char *) shm_address, newlist.c_str());

#ifdef _DEBUG
        prjdbg("[%s:%d] [- %s] New blacklist is now '%s'.", __FILE__, __LINE__, __FUNCTION__, (char *) shm_address);
#endif
        prj_shm_detach(&shm_address);
    }

    prj_sem_unlock(semids[1]);

#ifdef _DEBUG
    PDEBUG_END
#endif

    return 0;
}

Oben also ein Beispiel dafür, wie Shared Memory bei mehreren konkurrierenden Prozessen verwendet werden kann.
Der eigentliche Aufruf der oben aufgeführten Funktionen kann im Prinzip dann so aussehen:

    /* some code before */

    int prj_glob_semid[3] =  { -1, -1, -1};
    int prj_glob_shmid = -1;

    prj_sem_create_all(prj_glob_semid, SEMKEYPATH, SEMKEYID); /* Macros s. shmem.h */
    prj_glob_shmid = prj_shm_create((size_t) SHMEMSIZE, SHMKEYPATH, SHMKEYID);

    prj_resetBlacklist(prj_glob_semid, prj_glob_shmid);

    prj_sem_destroy_all(prj_glob_semid);
    if (prj_glob_shmid != -1) /* otherwise a segfault */
          prj_shm_destroy(prj_glob_shmid);

    /* some code behind */

Hinterlasse eine Antwort

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind markiert *

Du kannst folgende HTML-Tags benutzen: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>