C/C++: Zugriff mehrerer Prozesse auf Shared Memory unter Berücksichtigung des 3rd readers/writers Problems
Im folgenden Listing gehe ich auf den Zugriff auf Shared Memory ein. Dabei soll sichergestellt werden, dass zwei Prozesse, die auf den gemeinsamen Shared Memory zugreifen wollen, sich nicht gegenseitig am Zugriff hindern, die Prozesse also quasi „verhungern“.
Siehe hierzu auch zum Beispiel Wikipedia bzw. etwas genauer auf rfc1149.net.
Der folgende Teil bezieht sich auf die Semaphoren und den Shared Memory an für sich:
Die Header-Datei shmem.h:
#ifndef PRJ_SHMEM_H #define PRJ_SHMEM_H 1 #include <prjconfig.h> #define SEMKEYPATH "/opt/prj/sbin/prjd" /* Path used on ftok for semget key */ #define SEMKEYID 1209 /* Id used on ftok for semget key */ #define SHMKEYPATH "/opt/prj/sbin/prjd" /* Path used on ftok for shmget key */ #define SHMKEYID 1980 /* Id used on ftok for shmget key */ #define NUMSEMS 1 /* Num of sems in created sem set */ #define SHMEMSIZE 1024 /* Size of shared memory */ #if defined(__cplusplus) extern "C" { #endif #ifndef DARWIN union semun { int val; /* Value for SETVAL */ struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ unsigned short *array; /* Array for GETALL, SETALL */ struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */ }; #endif int prj_shm_create(size_t sz, char const *fileName, int keyid); int prj_sem_create_all(int *semids, char const *fileName, int keyid); void prj_sem_destroy(int semid); void prj_sem_destroy_all(int *semids); void prj_shm_destroy(int shmid); void prj_shm_attach(int shmid, void **shm_address); void prj_shm_detach(void **shm_address); int prj_sem_block_shm_write(int semid); int prj_sem_unblock_shm_write(int semid); void prj_sem_set_reader(int semid, int reader); int prj_sem_get_reader(int semid); void /* P */ prj_sem_lock(int semid); void /* V */ prj_sem_unlock(int semid); #if defined(__cplusplus) } #endif #endif /* !PRJ_SHMEM_H */
Und der Code selbst (shmem.c):
Die Funktionen zum Loggen und Debuggen müssen natürlich angepasst werden.
#include <prjconfig.h> #define _XOPEN_SOURCE #include <stdio.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/shm.h> #include <signal.h> #undef _XOPEN_SOURCE #include "logger.h" #include "shmem.h" /* * \brief prj_sem_create * * Tries to bound to an existing semaphore. * If this fails a new semaphore will be created. * * \param number incremental identifier for one of the tree semaphores * \param fileName normally path of application which wants to use this semaphore * \param keyid selfmade identifier * * \return on success internal semaphore id, otherwise -1 */ int prj_sem_create(int number, char const *fileName, int keyid) { int rc, semid; key_t semkey; union semun mysem; semkey = ftok(fileName, keyid + number); if (semkey == (key_t) -1) { prjlog("warn", "shmem", "ftok() for sem failed: %s (%d)", ERR, (int) errno); return -1; } semid = semget(semkey, NUMSEMS, 0600); if (semid == -1) { prjlog("info", "shmem", "semget() failed: %s (%d) (no create)", ERR, (int) errno); semid = semget(semkey, NUMSEMS, 0600 | IPC_CREAT); if (semid == -1) { prjlog("warn", "shmem", "semget() failed: %s (%d)", ERR, (int) errno); return -1; } else { mysem.val = 1; rc = semctl(semid, 0, SETVAL, mysem); if (rc == -1) { prjlog("warn", "shmem", "semctl() failed: %s (%d) - destroying semaphore", ERR, (int) errno); prj_sem_destroy(semid); return -1; } } } prjlog("debug", "shmem", "bound to semaphore %d", (int) number); return semid; } /* * \brief prj_sem_create_all * * Creates three semaphores needed to avoid 3rd readers/writers problem * * \param semids container which will hold the three semaphore identifier * \param fileName normally path of application which wants to use this semaphore * \param keyid selfmade identifier * * \return on success filled semaphore container, otherwise -1 */ int prj_sem_create_all(int *semids, char const *fileName, int keyid) { int y; /* * 0 id_mutex * 1 id_wri * 2 id_rc */ for (y = 0; y < 3; y++) { semids[y] = prj_sem_create(y, fileName, keyid); if (semids[y] == -1) return -1; } return 0; } /* * \brief prj_shm_create * * Tries to bound to an existing shared memory segment. * If this fails a new shared memory segment will be created. * * \param sz size of shared memory segment to create * \param fileName normally path of application which wants to use this shared memory segment * \param keyid selfmade identifier * * \return on success internal shared memory id, otherwise -1 */ int prj_shm_create(size_t sz, char const *fileName, int keyid) { int shmid; key_t shmkey; shmkey = ftok(fileName, keyid); if (shmkey == (key_t) -1) { prjlog("warn", "shmem", "ftok() for shm failed: %s (%d)", ERR, (int) errno); return -1; } shmid = shmget(shmkey, sz, 0600 | IPC_CREAT); if (shmid == -1) { prjlog("warn", "shmem", "shmget() failed: %s (%d)", ERR, (int) errno); return -1; } prjlog("debug", "shmem", "bound to shared memory"); return shmid; } /* * \brief prj_sem_destroy * * Destroys semaphore identified by `semid'. * * \param semid semaphore id got by prj_sem_create() * * \return void() */ void prj_sem_destroy(int semid) { if (semctl(semid, 1, IPC_RMID) == -1) { prjlog("warn", "shmem", "semctl() remove id failed: %s (%d)", ERR, (int) errno); } else { prjlog("debug", "shmem", "semaphores destroyed"); } } /* * \brief prj_sem_destroy_all * * Destroys all three semaphores created by prj_sem_create_all() * * \param semids container filled with semaphore ids * * \return void() */ void prj_sem_destroy_all(int *semids) { int y; for (y = 0; y < 3; y++) { prj_sem_destroy(semids[y]); } } /* * \brief prj_shm_destroy * * Destroys shared memory segment identified by `shmid' * * \param shmid shared memory id got by prj_shm_create() * * \return void() */ void prj_shm_destroy(int shmid) { struct shmid_ds shmid_struct; if (shmctl(shmid, IPC_RMID, &shmid_struct) == -1) { prjlog("warn", "shmem", "shmctl() remove id failed: %s (%d)", ERR, (int) errno); } else { prjlog("debug", "shmem", "shared memory destroyed"); } } /* * \brief prj_shm_attach * * Attach to a shared memory segment. Pointer of address is stored in `shm_address'. * * \param shmid shared memory id got by prj_shm_create() * \param shm_address Pointer of pointer to address of shared memory segment * * \return void() */ void prj_shm_attach(int shmid, void **shm_address) { *shm_address = shmat(shmid, NULL, 0); if (*shm_address == NULL) { prjlog("warn", "shmem", "Could not attach shared memory: %s (%d)", ERR, (int) errno); } else prjlog("debug", "shmem", "attached shared memory"); } /* * \brief prj_shm_detach * * Detach a shared memory segment. * * \param shm_address Pointer of pointer to address of shared memory segment * * \return void() */ void prj_shm_detach(void **shm_address) { if (shmdt(*shm_address) == -1) { prjlog("warn", "shmem", "Could not detach shared memory: %s (%d)", ERR, (int) errno); } else prjlog("debug", "shmem", "detached shared memory"); } /* * \brief prj_sem_set_reader * * Sets a counter of corresponding readers. * * \param semid identifier of semaphore which is responsible for count of readers * \param readers number of current reader * * \return void() */ void prj_sem_set_reader(int semid, int reader) { union semun rsemun; rsemun.val = reader; semctl(semid, 0, SETVAL, rsemun); } /* * \brief prj_sem_get_reader * * returns number of current readers * * \param semid identifier of semaphore which is responsible for count of readers * * \return void() */ int prj_sem_get_reader(int semid) { return semctl(semid, 0, GETVAL); } /* * \brief prj_sem_lock * * locks access to a semaphore * * \param semid identifier of semaphore * * \return void() */ void /* P */ prj_sem_lock(int semid) { struct sembuf mysemop; mysemop.sem_num = 0; mysemop.sem_op = -1; mysemop.sem_flg = 0; if (semop(semid, &mysemop, (size_t) 1) == -1) { uid_t uid = geteuid(); if (uid != 0) { prjlog("critical", "shmem", "semop (lock) failed - committing suicide (%ld)", (long) getpid()); kill(getpid(), SIGTERM); kill(getpid(), SIGKILL); } } } /* * \brief prj_sem_unlock * * unlocks access to a semaphore * * \param semid identifier of semaphore * * \return void() */ void /* V */ prj_sem_unlock(int semid) { struct sembuf mysemop; mysemop.sem_num = 0; mysemop.sem_op = 1; mysemop.sem_flg = 0; if (semop(semid, &mysemop, (size_t) 1) == -1) { uid_t uid = geteuid(); if (uid != 0) { prjlog("critical", "shmem", "semop (lock) failed - committing suicide (%ld)", (long) getpid()); kill(getpid(), SIGTERM); kill(getpid(), SIGKILL); } } }
Im folgenden beispielhaften Listing werden sodann die oben zur Verfügung gestellten Funktionen verwendet.
Das Ganze hierbei, um nicht verfügbare Rechnersysteme in einer Blacklist zu verwalten, die von mehreren Child-Prozessen abgerufen wird (diesmal ein bisschen C++).
Einige eigene Funktionen, die hier verwendet werden aber nicht zur Verfügung stehen, ändern am Prinzip der Funktionsweise des Codes nichts.
Erst einmal die Definitionen in blacklist.hh:
#ifndef PRJ_BLACKLIST_HH #define PRJ_BLACKLIST_HH 1 #include <string> #include <vector> #include <iostream> #define BLACKLIST_DELIMITER "#" std::vector <std::string> prj_getBlacklist(int *semids, int shmid); std::vector <std::string> prj_getWhitelist(std::vector<std::string>& blacklist, std::vector<std::string>& fulllist); void prj_resetBlacklist(int *semids, int shmid); int prj_blacklistServer(std::string dead_server, int *semids, int shmid); #endif
Und dann der Code in blacklist.cc:
#include <prjconfig.h> #include <string> #include <vector> #include <iostream> #include <sstream> #include <algorithm> #include <iterator> #include <set> #include "blacklist.hh" #include "split.hh" #include "shmem.h" #include "debug.h" #include "logger.h" /* * \brief prj_getBlacklist * * Returns a vector of Strings which contains current unavailable servers. * * \param semids container of semaphores to avoid 3rd readers/writers problem * \param shmid identifier of shared memory segment holding current blacklist * * \return vector of strings containing blacklisted server */ std::vector <std::string> prj_getBlacklist(int *semids, int shmid) { #ifdef _DEBUG PDEBUG_START #endif int reader; void *shm_address; std::stringstream tmp; std::string blacklist; std::vector<std::string> blacklisted; prj_sem_lock(semids[0]); reader = prj_sem_get_reader(semids[2]); if (reader == -1) { #ifdef _DEBUG PDEBUG_FAILED #endif return blacklisted; } reader += 1; prj_sem_set_reader(semids[2], reader); if (reader == 1) { prj_sem_lock(semids[1]); } prj_sem_unlock(semids[0]); /* **************************************************************************************** */ prj_shm_attach(shmid, &shm_address); if (shm_address != NULL) { tmp << (char *) shm_address; blacklist = tmp.str(); #ifdef _DEBUG prjdbg("[%s:%d] [- %s] Current readable blacklist: '%s'.", __FILE__, __LINE__, __FUNCTION__, blacklist.c_str()); #endif prjlog("debug", "ad", "current blacklist: %s", blacklist.c_str()); StringExplode(blacklist, BLACKLIST_DELIMITER, &blacklisted); prj_shm_detach(&shm_address); } #ifdef _DEBUG else prjdbg("[%s:%d] [- %s] Current blacklist is not readable.", __FILE__, __LINE__, __FUNCTION__); #endif /* **************************************************************************************** */ prj_sem_lock(semids[0]); reader = prj_sem_get_reader(semids[2]); if (reader == -1) { #ifdef _DEBUG PDEBUG_FAILED #endif return blacklisted; } reader -= 1; prj_sem_set_reader(semids[2], reader); if (reader == 0) { prj_sem_unlock(semids[1]); } prj_sem_unlock(semids[0]); #ifdef _DEBUG PDEBUG_END #endif return blacklisted; } /* * \brief prj_getWhitelist * * Returns a vector of Strings which contains current available servers. * * \param blacklist vector of strings of current blacklisted server * \param fulllist vector of strings of all configured server * * \return vector of strings containing available server */ std::vector <std::string> prj_getWhitelist(std::vector<std::string>& blacklist, std::vector<std::string>& fulllist) { #ifdef _DEBUG PDEBUG_START #endif std::vector <std::string> whitelist; size_t i, j; bool found = false; for (i = 0; i < fulllist.size(); ++i) { #ifdef _DEBUG prjdbg("[%s:%d] [- %s] Fulllist: '%s'.", __FILE__, __LINE__, __FUNCTION__, fulllist[i].c_str()); #endif for (j = 0; j < blacklist.size(); j++) { #ifdef _DEBUG prjdbg("[%s:%d] [- %s] Blacklist: '%s'.", __FILE__, __LINE__, __FUNCTION__, blacklist[j].c_str()); #endif if (!strcmp(blacklist[j].c_str(), fulllist[i].c_str())) found = true; } if (found != true) whitelist.push_back(fulllist[i]); found = false; } #ifdef _DEBUG PDEBUG_END #endif return whitelist; } /* * \brief prj_resetBlacklist * * resets current blacklist stored in shared memory * * \param semids container of semaphores to avoid 3rd readers/writers problem * \param shmid identifier of shared memory segment holding current blacklist * * \return void() */ void prj_resetBlacklist(int *semids, int shmid) { #ifdef _DEBUG PDEBUG_START #endif void *shm_address; prj_sem_lock(semids[1]); prj_shm_attach(shmid, &shm_address); if (shm_address != NULL) { strcpy((char *) shm_address, "#"); prj_shm_detach(&shm_address); } prj_sem_unlock(semids[1]); #ifdef _DEBUG PDEBUG_END #endif } /* * \brief prj_blacklistServer * * adds an unavailable server to current blacklist stored in shared memory * * \param dead_server string of name of server which is actually down * \param semids container of semaphores to avoid 3rd readers/writers problem * \param shmid identifier of shared memory segment holding current blacklist * * \return returns always zero */ int prj_blacklistServer(std::string dead_server, int *semids, int shmid) { #ifdef _DEBUG prjdbg("[%s:%d] [< %s] Blacklisting server '%s'.", __FILE__, __LINE__, __FUNCTION__, dead_server.c_str()); #endif void *shm_address; std::stringstream copy; std::string newlist; prj_sem_lock(semids[1]); prj_shm_attach(shmid, &shm_address); if (shm_address != NULL) { #ifdef _DEBUG prjdbg("[%s:%d] [- %s] Old blacklist is '%s'.", __FILE__, __LINE__, __FUNCTION__, (char *) shm_address); #endif copy << (char *) shm_address; copy << dead_server << "#"; #ifdef _DEBUG prjdbg("[%s:%d] [- %s] New generated blacklist is '%s'.", __FILE__, __LINE__, __FUNCTION__, copy.str().c_str()); #endif /* long way around for HP-UX only, i'm not happy about this */ newlist = copy.str(); strcpy((char *) shm_address, newlist.c_str()); #ifdef _DEBUG prjdbg("[%s:%d] [- %s] New blacklist is now '%s'.", __FILE__, __LINE__, __FUNCTION__, (char *) shm_address); #endif prj_shm_detach(&shm_address); } prj_sem_unlock(semids[1]); #ifdef _DEBUG PDEBUG_END #endif return 0; }
Oben also ein Beispiel dafür, wie Shared Memory bei mehreren konkurrierenden Prozessen verwendet werden kann.
Der eigentliche Aufruf der oben aufgeführten Funktionen kann im Prinzip dann so aussehen:
/* some code before */ int prj_glob_semid[3] = { -1, -1, -1}; int prj_glob_shmid = -1; prj_sem_create_all(prj_glob_semid, SEMKEYPATH, SEMKEYID); /* Macros s. shmem.h */ prj_glob_shmid = prj_shm_create((size_t) SHMEMSIZE, SHMKEYPATH, SHMKEYID); prj_resetBlacklist(prj_glob_semid, prj_glob_shmid); prj_sem_destroy_all(prj_glob_semid); if (prj_glob_shmid != -1) /* otherwise a segfault */ prj_shm_destroy(prj_glob_shmid); /* some code behind */