[Openais] Patch II design review revision 1 !!
Muni Bajpai
muniba at nortel.com
Wed Feb 23 12:18:32 PST 2005
Skipped content of type multipart/alternative-------------- next part --------------
--- ../latest/exec/ckpt.c 2005-02-23 12:56:16.000000000 -0600
+++ ../bk_openais/exec/ckpt.c 2005-02-23 13:02:46.000000000 -0600
@@ -16,7 +16,6 @@
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
@@ -57,17 +56,30 @@
#include "totempg.h"
#define LOG_SERVICE LOG_SERVICE_CKPT
+#define CKPT_MAX_SECTION_DATA_SEND (1024*400)
#include "print.h"
-DECLARE_LIST_INIT(checkpointListHead);
+DECLARE_LIST_INIT(checkpoint_list_head);
-DECLARE_LIST_INIT(checkpointIteratorListHead);
+DECLARE_LIST_INIT(checkpoint_iterator_list_head);
+
+DECLARE_LIST_INIT(recovery_sync_list_head);
+
+DECLARE_LIST_INIT(checkpoint_recovery_list_head);
struct checkpoint_cleanup {
struct list_head list;
struct saCkptCheckpoint *checkpoint;
};
+struct checkpoint_sync {
+ struct list_head list;
+ struct req_exec_ckpt_synchronize_state request;
+};
+#ifdef TODO
+static void *tok_call_handle = 0;
+#endif
+
//TODO static totempg_recovery_plug_handle ckpt_checkpoint_recovery_plug_handle;
static int ckpt_exec_init_fn (void);
@@ -78,6 +90,8 @@
static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct in_addr source_addr, int endian_conversion_required);
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required);
+
static int message_handler_req_exec_ckpt_checkpointclose (void *message, struct in_addr source_addr, int endian_conversion_required);
static int message_handler_req_exec_ckpt_checkpointunlink (void *message, struct in_addr source_addr, int endian_conversion_required);
@@ -133,24 +147,28 @@
static int message_handler_req_lib_ckpt_sectioniteratorinitialize (struct conn_info *conn_info, void *message);
static int message_handler_req_lib_ckpt_sectioniteratornext (struct conn_info *conn_info, void *message);
-static int ckpt_confchg_fn (
- enum totempg_configuration_type configuration_type,
- struct in_addr *member_list, void *member_list_private,
- int member_list_entries,
- struct in_addr *left_list, void *left_list_private,
- int left_list_entries,
- struct in_addr *joined_list, void *joined_list_private,
- int joined_list_entries,
- struct memb_ring_id *ring_id) {
+//RECOVERY_MUNI
+static void ckpt_recovery_inititialize ();
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *);
+static int ckpt_recovery_finalize();
+static int ckpt_recovery_abort();
+static int ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries);
+static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor, SaNameT *checkpointName);
+static int recovery_section_write(SaCkptSectionDescriptorT *sectionDescriptor, SaNameT *checkpointName,
+ void *newData, SaUint32T dataOffSet, SaUint32T dataSize);
-#ifdef TODO
- if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
- totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);
- }
-#endif
- return (0);
-}
+static struct memb_ring_id saved_ring_id;
+
+static int ckpt_confchg_fn(
+ enum totempg_configuration_type configuration_type,
+ struct in_addr *member_list, void *member_list_private,
+ int member_list_entries,
+ struct in_addr *left_list, void *left_list_private,
+ int left_list_entries,
+ struct in_addr *joined_list, void *joined_list_private,
+ int joined_list_entries,
+ struct memb_ring_id *ring_id);
struct libais_handler ckpt_libais_handlers[] =
{
@@ -258,7 +276,8 @@
message_handler_req_exec_ckpt_sectionexpirationtimeset,
message_handler_req_exec_ckpt_sectionwrite,
message_handler_req_exec_ckpt_sectionoverwrite,
- message_handler_req_exec_ckpt_sectionread
+ message_handler_req_exec_ckpt_sectionread,
+ message_handler_req_exec_ckpt_synchronize_state
};
struct service_handler ckpt_service_handler = {
@@ -273,18 +292,289 @@
.exec_dump_fn = 0
};
-static struct memb_ring_id saved_ring_id;
+static int setProcessorIndex(struct in_addr *proc_addr,
+ struct ckpt_refcnt *ckpt_refcount) { //RECOVERY_MUNI
+ int i;
+ for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+
+ // If the source addresses match then this processor index
+ // has already been set
+
+ if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+ return -1;
+ }
+
+ // If the source addresses do not match and this element
+ // has no stored value then store the new value and
+ // return the Index.
+
+ else if (ckpt_refcount[i].addr.s_addr == 0) {
+ memcpy(&ckpt_refcount[i].addr, proc_addr, sizeof(struct in_addr));
+ return i;
+ }
+ }
+ // Could not Find an empty slot
+ // to store the new Processor.
+ return -1;
+}
+
+static int findProcessorIndex(struct in_addr *proc_addr,
+ struct ckpt_refcnt *ckpt_refcount) { //RECOVERY_MUNI
+ int i;
+ for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+ //If the source addresses match then return the index
+
+ if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+ return i;
+ }
+ }
+ // Could not Find the Processor
+ return -1;
+}
+
+static void initialize_ckpt_refcount_array (struct ckpt_refcnt *ckpt_refcount) {
+ memset((char*)ckpt_refcount, 0, PROCESSOR_COUNT_MAX * sizeof(struct ckpt_refcnt));
+}
+
+static void ckpt_recovery_inititialize () {
+ struct list_head *checkpoint_list;
+ struct saCkptCheckpoint *checkpoint;
+ struct checkpoint_sync *sync_msg;
+ struct saCkptCheckpoint *savedCheckpoint;
+ struct list_head *checkpoint_section_list;
+ struct saCkptCheckpointSection *ckptCheckpointSection;
+
+
+ for (checkpoint_list = checkpoint_list_head.next;
+ checkpoint_list != &checkpoint_list_head;
+ checkpoint_list = checkpoint_list->next) {
+
+ checkpoint = list_entry (checkpoint_list,
+ struct saCkptCheckpoint, list);
+
+ //Save off the elements in the new list
+ savedCheckpoint =
+ (struct saCkptCheckpoint *) malloc (sizeof(struct saCkptCheckpoint));
+ assert(savedCheckpoint);
+ memcpy(savedCheckpoint, checkpoint, sizeof(struct saCkptCheckpoint));
+ list_init(&savedCheckpoint->list);
+ list_add(&savedCheckpoint->list,&checkpoint_recovery_list_head);
+ SaUint32T sync_sequence_number = 0;
+
+ for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next;
+ checkpoint_section_list != &checkpoint->checkpointSectionsListHead;
+ checkpoint_section_list = checkpoint_section_list->next) {
+
+ ckptCheckpointSection = list_entry (checkpoint_section_list,
+ struct saCkptCheckpointSection, list);
+
+ SaSizeT origSectionSize = ckptCheckpointSection->sectionDescriptor.sectionSize;
+ SaSizeT sectionDataSent = 0;
+ SaSizeT newSectionSize = 0;
+
+ while (sectionDataSent < origSectionSize) {
+ if ((origSectionSize - sectionDataSent) > CKPT_MAX_SECTION_DATA_SEND) { // Send a Max of 400K of section data
+ newSectionSize = CKPT_MAX_SECTION_DATA_SEND;
+ }
+ else {
+ newSectionSize = (origSectionSize - sectionDataSent);
+ }
+
+ //Create and save a new Sync message.
+ sync_msg =
+ (struct checkpoint_sync *)malloc(sizeof(struct checkpoint_sync));
+ assert(sync_msg);
+ memcpy(&sync_msg->request.checkpointName, &savedCheckpoint->name, sizeof(SaNameT));
+ memcpy(&sync_msg->request.checkpointCreationAttributes,
+ &savedCheckpoint->checkpointCreationAttributes,
+ sizeof(SaCkptCheckpointCreationAttributesT));
+ memcpy(&sync_msg->request.sectionDescriptor,
+ &ckptCheckpointSection->sectionDescriptor,
+ sizeof(SaCkptSectionDescriptorT));
+ memcpy((char*)sync_msg->request.sectionData,
+ ((char*)ckptCheckpointSection->sectionData + sectionDataSent),
+ newSectionSize);
+ memcpy (&sync_msg->request.dataOffSet, §ionDataSent, sizeof(SaUint32T));
+ memcpy (&sync_msg->request.dataSize, &newSectionSize, sizeof(SaUint32T));
+ memcpy(&sync_msg->request.previous_ring_id, &saved_ring_id, sizeof(struct memb_ring_id));
+ memcpy(&sync_msg->request.source_addr, &this_ip.sin_addr, sizeof(struct in_addr));
+ memcpy(&sync_msg->request.sync_msg_sequence_number, &sync_sequence_number,sizeof(SaUint32T));
+
+ int proc_index = findProcessorIndex(&this_ip.sin_addr,savedCheckpoint->ckpt_refcount);
+
+ if (proc_index != -1 ) {
+ memcpy(&sync_msg->request.ref_count,
+ &savedCheckpoint->ckpt_refcount[proc_index].count,
+ sizeof(SaUint32T));
+ }
+ else {
+ log_printf (LOG_LEVEL_ERROR,
+ "CKPT: Could not find the checkpoint entry for this processor %p \n",
+ checkpoint);
+ continue;
+ }
+ sync_sequence_number++;
+ sectionDataSent += newSectionSize;
+ list_init(&sync_msg->list);
+ list_add(&sync_msg->list,&recovery_sync_list_head);
+ }
+ }
+ }
+#ifdef TODO
+ int res = totempg_token_callback_create(&tok_call_handle,
+ TOTEMPG_CALLBACK_TOKEN_SENT,
+ 0,
+ ckpt_recovery_process,
+ NULL);
+#endif
+}
+
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *data ) {
+
+ struct req_exec_ckpt_synchronize_state *request_exec_sync_state;
+ struct iovec iovecs[2];
+ struct list_head *syncList;
+ struct checkpoint_sync *sync_element;
+
+ //Check for empty list here
+ if (list_empty(&recovery_sync_list_head)) {
+ return (0);
+ }
+ //Extract the element
+ syncList = recovery_sync_list_head.next;
+ sync_element = list_entry (syncList,
+ struct checkpoint_sync, list);
+
+ request_exec_sync_state = &sync_element->request;
+
+ log_printf (LOG_LEVEL_DEBUG, "callback recover_process.\n");
+ request_exec_sync_state->header.size =
+ sizeof (struct req_exec_ckpt_synchronize_state);
+ request_exec_sync_state->header.id = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE;
+
+ iovecs[0].iov_base = (char *)&request_exec_sync_state;
+ iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_state);
+
+ assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
+ list_del(&sync_element->list);
+ free(sync_element);
+
+ //Check for empty list here
+ if (list_empty(&recovery_sync_list_head)) {
+ //todo Call finalize here.
+ return (0);
+ }
+#ifdef TODO
+ //We have more to send ...
+ int res = totempg_token_callback_create(&tok_call_handle,
+ TOTEMPG_CALLBACK_TOKEN_SENT,
+ 0,
+ ckpt_recovery_process,
+ NULL);
+#endif
+}
+
+static int ckpt_recovery_finalize () {
+ struct list_head *checkpoint_list;
+ struct saCkptCheckpoint *checkpoint;
+
+ checkpoint_list = checkpoint_list_head.next;
+ while (checkpoint_list != &checkpoint_list_head) {
+ checkpoint = list_entry (checkpoint_list,
+ struct saCkptCheckpoint, list);
+ list_del(&checkpoint->list);
+ free(checkpoint);
+ checkpoint_list = checkpoint_list_head.next;
+ }
+
+ list_init(&checkpoint_list_head);
+
+ memcpy(&checkpoint_list_head, &checkpoint_recovery_list_head, sizeof(struct list_head));
+
+ list_init(&checkpoint_recovery_list_head);
+
+ return (0);
+}
+
+static int ckpt_recovery_abort () {
+//@TODO add some code here to abort the recovery process
+ return (0);
+}
+
+static int ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries) {
+ struct list_head *checkpoint_list;
+ struct saCkptCheckpoint *checkpoint;
+ struct in_addr *member;
+
+ if (left_list_entries == 0) {
+ return (0);
+ }
+ // Iterate left_list_entries - 1 times and at least once.
+ int i = 0;
+ do {
+ member = left_list;
+ for (checkpoint_list = checkpoint_list_head.next;
+ checkpoint_list != &checkpoint_list_head;
+ checkpoint_list = checkpoint_list->next) {
+
+ checkpoint = list_entry (checkpoint_list,
+ struct saCkptCheckpoint, list);
+
+ int index = findProcessorIndex(member, checkpoint->ckpt_refcount);
+
+ if (index == -1) {
+ continue;
+ }
+
+ checkpoint->ckpt_refcount[index].count = 0;
+ memset((char*)&checkpoint->ckpt_refcount[index].addr, 0, sizeof(struct in_addr));
+
+ if (checkpoint->referenceCount > 0) {
+ checkpoint->referenceCount--;
+ }
+
+ }
+ i++;
+ left_list++;
+ } while (i < (left_list_entries - 1));
+ return(0);
+}
+
+static int ckpt_confchg_fn (
+ enum totempg_configuration_type configuration_type,
+ struct in_addr *member_list, void *member_list_private,
+ int member_list_entries,
+ struct in_addr *left_list, void *left_list_private,
+ int left_list_entries,
+ struct in_addr *joined_list, void *joined_list_private,
+ int joined_list_entries,
+ struct memb_ring_id *ring_id) {
+
+ if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
+#ifdef TODO
+ totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);
+#endif
+ memcpy (&saved_ring_id, ring_id, sizeof(struct memb_ring_id));//RECOVERY_MUNI
+ }
+
+ else if (configuration_type == TOTEMPG_CONFIGURATION_TRANSITIONAL) {
+ ckpt_recovery_process_members_exit(left_list, left_list_entries);
+ ckpt_recovery_inititialize ();
+ }
+
+ return (0);
+}
static struct saCkptCheckpoint *ckpt_checkpoint_find_global (SaNameT *name)
{
- struct list_head *checkpointList;
+ struct list_head *checkpoint_list;
struct saCkptCheckpoint *checkpoint;
- for (checkpointList = checkpointListHead.next;
- checkpointList != &checkpointListHead;
- checkpointList = checkpointList->next) {
+ for (checkpoint_list = checkpoint_list_head.next;
+ checkpoint_list != &checkpoint_list_head;
+ checkpoint_list = checkpoint_list->next) {
- checkpoint = list_entry (checkpointList,
+ checkpoint = list_entry (checkpoint_list,
struct saCkptCheckpoint, list);
if (name_match (name, &checkpoint->name)) {
@@ -320,15 +610,15 @@
char *id,
int idLen)
{
- struct list_head *checkpointSectionList;
+ struct list_head *checkpoint_section_list;
struct saCkptCheckpointSection *ckptCheckpointSection;
log_printf (LOG_LEVEL_DEBUG, "Finding checkpoint section id %s %d\n", id, idLen);
- for (checkpointSectionList = ckptCheckpoint->checkpointSectionsListHead.next;
- checkpointSectionList != &ckptCheckpoint->checkpointSectionsListHead;
- checkpointSectionList = checkpointSectionList->next) {
+ for (checkpoint_section_list = ckptCheckpoint->checkpointSectionsListHead.next;
+ checkpoint_section_list != &ckptCheckpoint->checkpointSectionsListHead;
+ checkpoint_section_list = checkpoint_section_list->next) {
- ckptCheckpointSection = list_entry (checkpointSectionList,
+ ckptCheckpointSection = list_entry (checkpoint_section_list,
struct saCkptCheckpointSection, list);
log_printf (LOG_LEVEL_DEBUG, "Checking section id %*s\n",
@@ -406,7 +696,8 @@
{
// Initialize the saved ring ID.
saved_ring_id.seq = 0;
- saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;
+ saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;
+
#ifdef TODO
int res;
res = totempg_recovery_plug_create (&ckpt_checkpoint_recovery_plug_handle);
@@ -511,10 +802,11 @@
ckptCheckpoint->unlinked = 0;
list_init (&ckptCheckpoint->list);
list_init (&ckptCheckpoint->checkpointSectionsListHead);
- list_add (&ckptCheckpoint->list, &checkpointListHead);
+ list_add (&ckptCheckpoint->list, &checkpoint_list_head);
ckptCheckpoint->referenceCount = 0;
ckptCheckpoint->retention_timer = 0;
ckptCheckpoint->expired = 0;
+ initialize_ckpt_refcount_array(ckptCheckpoint->ckpt_refcount);
/*
* Add in default checkpoint section
@@ -548,7 +840,24 @@
*/
log_printf (LOG_LEVEL_DEBUG, "CHECKPOINT opened is %p\n", ckptCheckpoint);
ckptCheckpoint->referenceCount += 1;
-
+
+ // Add the connection reference information to the Checkpoint to be
+ // sent out later as a part of the sync process.
+ // RECOVERY_MUNI
+
+ int proc_index = findProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+ if (proc_index == -1) {//Could not find, lets set the processor to an index.
+ proc_index = setProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+ }
+ if (proc_index != -1 ) {
+ ckptCheckpoint->ckpt_refcount[proc_index].addr = source_addr;
+ ckptCheckpoint->ckpt_refcount[proc_index].count++;
+ }
+ else {
+ log_printf (LOG_LEVEL_ERROR,
+ "CKPT: MAX LIMIT OF PROCESSORS reached. Cannot store new proc %p info.\n",
+ ckptCheckpoint);
+ }
/*
* Reset retention duration since this checkpoint was just opened
*/
@@ -585,6 +894,46 @@
return (0);
}
+//RECOVERY_MUNI
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required) {
+ int retcode;
+ struct req_exec_ckpt_synchronize_state *req_exec_ckpt_sync_state
+ = (struct req_exec_ckpt_synchronize_state *)message;
+ if (memcmp (&req_exec_ckpt_sync_state->previous_ring_id, &saved_ring_id,sizeof (struct memb_ring_id)) != 0) {
+ return(0);
+ }
+ // Only do an open/section create for the 1st message for a checkpoint from a given processor.
+ if (req_exec_ckpt_sync_state->sync_msg_sequence_number == 0) {
+ struct req_exec_ckpt_checkpointopen request_open_exec;
+ struct req_lib_ckpt_checkpointopen request_open_lib;
+
+ request_open_lib.checkpointName = req_exec_ckpt_sync_state->checkpointName;
+ request_open_lib.checkpointCreationAttributes = req_exec_ckpt_sync_state->checkpointCreationAttributes;
+ request_open_exec.req_lib_ckpt_checkpointopen = request_open_lib;
+ message_handler_req_exec_ckpt_checkpointopen(&request_open_exec, req_exec_ckpt_sync_state->source_addr, 0);
+
+ retcode = recovery_section_create (&req_exec_ckpt_sync_state->sectionDescriptor,
+ &req_exec_ckpt_sync_state->checkpointName);
+ if (retcode != SA_AIS_OK) {
+ log_printf(LOG_LEVEL_ERROR, "CKPT: message_handler_req_exec_ckpt_synchronize_state\n");
+ log_printf(LOG_LEVEL_ERROR, "CKPT: recovery_section_create returned %d\n",retcode);
+ return (0);
+ }
+ }
+ // Write the contents of the section to the checkpoint section.
+ retcode = recovery_section_write(&req_exec_ckpt_sync_state->sectionDescriptor,
+ &req_exec_ckpt_sync_state->checkpointName,
+ req_exec_ckpt_sync_state->sectionData,
+ req_exec_ckpt_sync_state->dataOffSet,
+ req_exec_ckpt_sync_state->dataSize);
+ if (retcode != SA_AIS_OK) {
+ log_printf(LOG_LEVEL_ERROR, "CKPT: message_handler_req_exec_ckpt_synchronize_state\n");
+ log_printf(LOG_LEVEL_ERROR, "CKPT: recovery_section_write returned %d\n",retcode);
+ }
+
+ return (0);
+}
+
unsigned int abstime_to_msec (SaTimeT time)
{
struct timeval tv;
@@ -644,6 +993,19 @@
}
checkpoint->referenceCount--;
+ // Modify the connection reference information to the Checkpoint to be
+ // sent out later as a part of the sync process.
+ // RECOVERY_MUNI
+
+ int proc_index = findProcessorIndex(&source_addr, checkpoint->ckpt_refcount);
+ if (proc_index != -1 ) {
+ checkpoint->ckpt_refcount[proc_index].count--;
+ }
+ else {
+ log_printf (LOG_LEVEL_ERROR,
+ "CKPT: Could Not find Processor Info %p info.\n",
+ checkpoint);
+ }
assert (checkpoint->referenceCount >= 0);
log_printf (LOG_LEVEL_DEBUG, "disconnect called, new CKPT ref count is %d\n",
checkpoint->referenceCount);
@@ -652,10 +1014,10 @@
* If checkpoint has been unlinked and this is the last reference, delete it
*/
if (checkpoint->unlinked && checkpoint->referenceCount == 0) {
- log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
+ log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
checkpoint_release (checkpoint);
} else
- if (checkpoint->referenceCount == 0) {
+ if (checkpoint->referenceCount == 0) {
poll_timer_add (aisexec_poll_handle,
checkpoint->checkpointCreationAttributes.retentionDuration / 1000000,
checkpoint,
@@ -791,6 +1153,86 @@
return (0);
}
+static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor,
+ SaNameT *checkpointName) {
+ struct saCkptCheckpoint *ckptCheckpoint;
+ struct saCkptCheckpointSection *ckptCheckpointSection;
+ void *initialData;
+ void *sectionId;
+ SaErrorT error = SA_AIS_OK;
+
+ ckptCheckpoint = ckpt_checkpoint_find_global (checkpointName);
+ if (ckptCheckpoint == 0) {
+ error = SA_AIS_ERR_NOT_EXIST;
+ goto error_exit;
+ }
+
+ // Determine if user-specified checkpoint ID already exists
+
+ ckptCheckpointSection = ckpt_checkpoint_find_globalSection (ckptCheckpoint,
+ ((char *)sectionDescriptor->sectionId.id),
+ (int)sectionDescriptor->sectionId.idLen);
+ if (ckptCheckpointSection) {
+ error = SA_AIS_ERR_EXIST;
+ goto error_exit;
+ }
+
+ // Allocate checkpoint section
+ ckptCheckpointSection = malloc (sizeof (struct saCkptCheckpointSection));
+ if (ckptCheckpointSection == 0) {
+ error = SA_AIS_ERR_NO_MEMORY;
+ goto error_exit;
+ }
+ // Allocate checkpoint section data
+ initialData = malloc (sectionDescriptor->sectionSize);
+ if (initialData == 0) {
+ free (ckptCheckpointSection);
+ error = SA_AIS_ERR_NO_MEMORY;
+ goto error_exit;
+ }
+ // Allocate checkpoint section id
+ sectionId = malloc ((int)sectionDescriptor->sectionId.idLen);
+ if (sectionId == 0) {
+ free (ckptCheckpointSection);
+ free (initialData);
+ error = SA_AIS_ERR_NO_MEMORY;
+ goto error_exit;
+ }
+
+ // Copy checkpoint section ID and initialize data.
+ memcpy (sectionId, ((char *)sectionDescriptor->sectionId.id),
+ (int)sectionDescriptor->sectionId.idLen);
+
+ memset (initialData, 0, sectionDescriptor->sectionSize);
+
+ // Configure checkpoint section
+ ckptCheckpointSection->sectionDescriptor.sectionId.id = sectionDescriptor->sectionId.id;
+ ckptCheckpointSection->sectionDescriptor.sectionId.idLen = sectionDescriptor->sectionId.idLen;
+ ckptCheckpointSection->sectionDescriptor.sectionSize = sectionDescriptor->sectionSize;
+ ckptCheckpointSection->sectionDescriptor.expirationTime = sectionDescriptor->expirationTime;
+ ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;
+ ckptCheckpointSection->sectionDescriptor.lastUpdate = sectionDescriptor->lastUpdate;
+ ckptCheckpointSection->sectionData = initialData;
+ ckptCheckpointSection->expiration_timer = 0;
+
+ if (sectionDescriptor->expirationTime != SA_TIME_END) {
+ poll_timer_add (aisexec_poll_handle,
+ abstime_to_msec (ckptCheckpointSection->sectionDescriptor.expirationTime),
+ ckptCheckpointSection,
+ timer_function_section_expire,
+ &ckptCheckpointSection->expiration_timer);
+ }
+
+ // Add checkpoint section to checkpoint
+ list_init (&ckptCheckpointSection->list);
+ list_add (&ckptCheckpointSection->list,
+ &ckptCheckpoint->checkpointSectionsListHead);
+
+error_exit:
+ return (error);
+
+}
+
static int message_handler_req_exec_ckpt_sectioncreate (void *message, struct in_addr source_addr, int endian_conversion_required) {
struct req_exec_ckpt_sectioncreate *req_exec_ckpt_sectioncreate = (struct req_exec_ckpt_sectioncreate *)message;
struct req_lib_ckpt_sectioncreate *req_lib_ckpt_sectioncreate = (struct req_lib_ckpt_sectioncreate *)&req_exec_ckpt_sectioncreate->req_lib_ckpt_sectioncreate;
@@ -1015,6 +1457,62 @@
return (0);
}
+static int recovery_section_write(SaCkptSectionDescriptorT *sectionDescriptor,
+ SaNameT *checkpointName,
+ void *newData,
+ SaUint32T dataOffSet,
+ SaUint32T dataSize) {
+ struct saCkptCheckpoint *ckptCheckpoint;
+ struct saCkptCheckpointSection *ckptCheckpointSection;
+ int sizeRequired;
+ void *sectionData;
+ SaErrorT error = SA_AIS_OK;
+
+ log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery_section_write.\n");
+ ckptCheckpoint = ckpt_checkpoint_find_global (checkpointName);
+ if (ckptCheckpoint == 0) {
+ error = SA_AIS_ERR_NOT_EXIST;
+ goto error_exit;
+ }
+
+ // Find checkpoint section to be written
+ ckptCheckpointSection = ckpt_checkpoint_find_globalSection (ckptCheckpoint,
+ ((char *)sectionDescriptor->sectionId.id),
+ (int)sectionDescriptor->sectionId.idLen);
+ if (ckptCheckpointSection == 0) {
+ error = SA_AIS_ERR_NOT_EXIST;
+ goto error_exit;
+ }
+
+ // If write would extend past end of section data, enlarge section
+ sizeRequired = dataOffSet + dataSize;
+ if (sizeRequired > ckptCheckpointSection->sectionDescriptor.sectionSize) {
+ sectionData = realloc (ckptCheckpointSection->sectionData, sizeRequired);
+ if (sectionData == 0) {
+ error = SA_AIS_ERR_NO_MEMORY;
+ goto error_exit;
+ }
+
+ // Install new section data
+ ckptCheckpointSection->sectionData = sectionData;
+ ckptCheckpointSection->sectionDescriptor.sectionSize = sizeRequired;
+ }
+
+ // Write checkpoint section to section data
+ if (dataSize > 0) {
+ char *sd;
+ int *val;
+ val = ckptCheckpointSection->sectionData;
+ sd = (char *)ckptCheckpointSection->sectionData;
+ memcpy (&sd[dataOffSet],
+ newData,
+ dataSize);
+ }
+error_exit:
+ return (error);
+}
+
+
static int message_handler_req_exec_ckpt_sectionwrite (void *message, struct in_addr source_addr, int endian_conversion_required) {
struct req_exec_ckpt_sectionwrite *req_exec_ckpt_sectionwrite = (struct req_exec_ckpt_sectionwrite *)message;
struct req_lib_ckpt_sectionwrite *req_lib_ckpt_sectionwrite = (struct req_lib_ckpt_sectionwrite *)&req_exec_ckpt_sectionwrite->req_lib_ckpt_sectionwrite;
@@ -1258,7 +1756,7 @@
conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorCount = 0;
conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorPos = 0;
list_add (&conn_info->ais_ci.u.libckpt_ci.sectionIterator.list,
- &checkpointIteratorListHead);
+ &checkpoint_iterator_list_head);
list_init (&conn_info->ais_ci.u.libckpt_ci.checkpoint_list);
error = SA_AIS_OK;
}
@@ -1400,7 +1898,7 @@
struct saCkptCheckpoint *checkpoint;
int memoryUsed = 0;
int numberOfSections = 0;
- struct list_head *checkpointSectionList;
+ struct list_head *checkpoint_section_list;
struct saCkptCheckpointSection *checkpointSection;
log_printf (LOG_LEVEL_DEBUG, "in status get\n");
@@ -1410,11 +1908,11 @@
*/
checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_checkpointstatusget->checkpointName);
- for (checkpointSectionList = checkpoint->checkpointSectionsListHead.next;
- checkpointSectionList != &checkpoint->checkpointSectionsListHead;
- checkpointSectionList = checkpointSectionList->next) {
+ for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next;
+ checkpoint_section_list != &checkpoint->checkpointSectionsListHead;
+ checkpoint_section_list = checkpoint_section_list->next) {
- checkpointSection = list_entry (checkpointSectionList,
+ checkpointSection = list_entry (checkpoint_section_list,
struct saCkptCheckpointSection, list);
memoryUsed += checkpointSection->sectionDescriptor.sectionSize;
@@ -1730,7 +2228,7 @@
struct saCkptCheckpointSection *ckptCheckpointSection;
struct saCkptSectionIteratorEntry *ckptSectionIteratorEntries;
struct saCkptSectionIterator *ckptSectionIterator;
- struct list_head *checkpointSectionList;
+ struct list_head *checkpoint_section_list;
int addEntry = 0;
int iteratorEntries = 0;
SaErrorT error = SA_AIS_OK;
@@ -1747,11 +2245,11 @@
/*
* Iterate list of checkpoint sections
*/
- for (checkpointSectionList = ckptCheckpoint->checkpointSectionsListHead.next;
- checkpointSectionList != &ckptCheckpoint->checkpointSectionsListHead;
- checkpointSectionList = checkpointSectionList->next) {
+ for (checkpoint_section_list = ckptCheckpoint->checkpointSectionsListHead.next;
+ checkpoint_section_list != &ckptCheckpoint->checkpointSectionsListHead;
+ checkpoint_section_list = checkpoint_section_list->next) {
- ckptCheckpointSection = list_entry (checkpointSectionList,
+ ckptCheckpointSection = list_entry (checkpoint_section_list,
struct saCkptCheckpointSection, list);
addEntry = 1;
--- ../latest/include/ipc_ckpt.h 2005-02-23 12:56:16.000000000 -0600
+++ ../bk_openais/include/ipc_ckpt.h 2005-02-23 10:56:31.000000000 -0600
@@ -37,6 +37,8 @@
#include "../include/ipc_gen.h"
#include "../include/ais_types.h"
#include "../include/saCkpt.h"
+#include "../exec/totemsrp.h"
+#include "../exec/ckpt.h"
enum req_lib_ckpt_checkpoint_types {
MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN = 1,
@@ -326,4 +328,18 @@
struct res_header header;
};
+struct req_exec_ckpt_synchronize_state {
+ struct req_header header;
+ struct memb_ring_id previous_ring_id;
+ SaNameT checkpointName;
+ SaCkptCheckpointCreationAttributesT checkpointCreationAttributes;
+ SaCkptSectionDescriptorT sectionDescriptor;
+ void *sectionData;
+ SaUint32T dataOffSet;
+ SaUint32T dataSize;
+ struct in_addr source_addr;
+ SaUint32T ref_count;
+ SaUint32T sync_msg_sequence_number;
+};
+
#endif /* IPC_CKPT_H_DEFINED */
--- ../latest/include/ipc_gen.h 2005-02-23 12:56:16.000000000 -0600
+++ ../bk_openais/include/ipc_gen.h 2005-02-17 13:59:10.000000000 -0600
@@ -69,6 +69,7 @@
MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE,
MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE,
MESSAGE_REQ_EXEC_CKPT_SECTIONREAD,
+ MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE,
MESSAGE_REQ_EXEC_EVT_EVENTDATA,
MESSAGE_REQ_EXEC_EVT_CHANCMD,
MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA
More information about the Openais
mailing list