[Openais] Patch II design review revision 2 !!
Muni Bajpai
muniba at nortel.com
Fri Feb 25 14:10:20 PST 2005
Skipped content of type multipart/alternative-------------- next part --------------
diff -uNr --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet --exclude=init --exclude=LICENSE --exclude=Makefile --exclude=man --exclude=README.devmap --exclude=SECURITY --exclude=TODO --exclude=CHANGELOG --exclude=conf --exclude=loc --exclude=Makefile.samples --exclude=QUICKSTART --exclude=test --exclude=.cdtproject --exclude=.project ../latest/exec/ckpt.c ../bk_openais/exec/ckpt.c
--- ../latest/exec/ckpt.c 2005-02-25 14:07:12.000000000 -0600
+++ ../bk_openais/exec/ckpt.c 2005-02-25 15:07:03.000000000 -0600
@@ -16,7 +16,6 @@
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the MontaVista Software, Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
@@ -57,18 +56,36 @@
#include "totempg.h"
#define LOG_SERVICE LOG_SERVICE_CKPT
+#define CKPT_MAX_SECTION_DATA_SEND (1024*400)
#include "print.h"
-DECLARE_LIST_INIT(checkpointListHead);
+DECLARE_LIST_INIT(checkpoint_list_head);
-DECLARE_LIST_INIT(checkpointIteratorListHead);
+DECLARE_LIST_INIT(checkpoint_iterator_list_head);
+
+DECLARE_LIST_INIT(recovery_sync_list_head);
+
+DECLARE_LIST_INIT(checkpoint_recovery_list_head);
struct checkpoint_cleanup {
struct list_head list;
struct saCkptCheckpoint *checkpoint;
};
-//TODO static totempg_recovery_plug_handle ckpt_checkpoint_recovery_plug_handle;
+struct checkpoint_sync {
+ struct list_head list;
+ enum nodeexec_message_types type;
+ union {
+ struct req_exec_ckpt_synchronize_state request_state;
+ struct req_exec_ckpt_synchronize_section request_section;
+ } base;
+};
+
+#ifdef TODO
+static void *tok_call_handle = 0;
+#endif
+
+/* TODO static totempg_recovery_plug_handle ckpt_checkpoint_recovery_plug_handle; */
static int ckpt_exec_init_fn (void);
@@ -78,6 +95,10 @@
static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct in_addr source_addr, int endian_conversion_required);
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required);
+
+static int message_handler_req_exec_ckpt_synchronize_section (void *message, struct in_addr source_addr, int endian_conversion_required);
+
static int message_handler_req_exec_ckpt_checkpointclose (void *message, struct in_addr source_addr, int endian_conversion_required);
static int message_handler_req_exec_ckpt_checkpointunlink (void *message, struct in_addr source_addr, int endian_conversion_required);
@@ -133,24 +154,27 @@
static int message_handler_req_lib_ckpt_sectioniteratorinitialize (struct conn_info *conn_info, void *message);
static int message_handler_req_lib_ckpt_sectioniteratornext (struct conn_info *conn_info, void *message);
-static int ckpt_confchg_fn (
- enum totempg_configuration_type configuration_type,
- struct in_addr *member_list, void *member_list_private,
- int member_list_entries,
- struct in_addr *left_list, void *left_list_private,
- int left_list_entries,
- struct in_addr *joined_list, void *joined_list_private,
- int joined_list_entries,
- struct memb_ring_id *ring_id) {
+static void ckpt_recovery_inititialize ();
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *);
+static void ckpt_recovery_finalize();
+static int ckpt_recovery_abort();
+static void ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries);
+static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor, SaNameT *checkpointName);
+static int recovery_section_write(SaCkptSectionIdT *sectionId, SaNameT *checkpointName,
+ void *newData, SaUint32T dataOffSet, SaUint32T dataSize);
-#ifdef TODO
- if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
- totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);
- }
-#endif
- return (0);
-}
+static struct memb_ring_id saved_ring_id;
+
+static int ckpt_confchg_fn(
+ enum totempg_configuration_type configuration_type,
+ struct in_addr *member_list, void *member_list_private,
+ int member_list_entries,
+ struct in_addr *left_list, void *left_list_private,
+ int left_list_entries,
+ struct in_addr *joined_list, void *joined_list_private,
+ int joined_list_entries,
+ struct memb_ring_id *ring_id);
struct libais_handler ckpt_libais_handlers[] =
{
@@ -231,7 +255,7 @@
},
{ /* 15 */
.libais_handler_fn = message_handler_req_lib_ckpt_checkpointsynchronizeasync,
- .response_size = sizeof (struct res_lib_ckpt_checkpointsynchronizeasync), // TODO RESPONSE
+ .response_size = sizeof (struct res_lib_ckpt_checkpointsynchronizeasync), /* TODO RESPONSE */
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC,
},
{ /* 16 */
@@ -258,7 +282,9 @@
message_handler_req_exec_ckpt_sectionexpirationtimeset,
message_handler_req_exec_ckpt_sectionwrite,
message_handler_req_exec_ckpt_sectionoverwrite,
- message_handler_req_exec_ckpt_sectionread
+ message_handler_req_exec_ckpt_sectionread,
+ message_handler_req_exec_ckpt_synchronize_state,
+ message_handler_req_exec_ckpt_synchronize_section
};
struct service_handler ckpt_service_handler = {
@@ -273,18 +299,395 @@
.exec_dump_fn = 0
};
-static struct memb_ring_id saved_ring_id;
+static int setProcessorIndex(struct in_addr *proc_addr,
+ struct ckpt_refcnt *ckpt_refcount) {
+ int i;
+ for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+ /*
+ * If the source addresses match then this processor index
+ * has already been set
+ */
+ if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+ return -1;
+ } else if (ckpt_refcount[i].addr.s_addr == 0) {
+ /*
+ * If the source addresses do not match and this element
+ * has no stored value then store the new value and
+ * return the Index.
+ */
+ memcpy(&ckpt_refcount[i].addr, proc_addr, sizeof(struct in_addr));
+ return i;
+ }
+ }
+ /*
+ * Could not Find an empty slot
+ * to store the new Processor.
+ */
+ return -1;
+}
+
+static int findProcessorIndex(struct in_addr *proc_addr,
+ struct ckpt_refcnt *ckpt_refcount) {
+ int i;
+ for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+ /*
+ * If the source addresses match then return the index
+ */
+
+ if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+ return i;
+ }
+ }
+ /*
+ * Could not Find the Processor
+ */
+ return -1;
+}
+
+static void initialize_ckpt_refcount_array (struct ckpt_refcnt *ckpt_refcount) {
+ memset((char*)ckpt_refcount, 0, PROCESSOR_COUNT_MAX * sizeof(struct ckpt_refcnt));
+}
+
+static void ckpt_recovery_inititialize () {
+ struct list_head *checkpoint_list;
+ struct saCkptCheckpoint *checkpoint;
+ struct checkpoint_sync *sync_msg;
+ struct checkpoint_sync *sync_section_msg;
+ struct saCkptCheckpoint *savedCheckpoint;
+ struct list_head *checkpoint_section_list;
+ struct saCkptCheckpointSection *ckptCheckpointSection;
+ SaSizeT origSectionSize;
+ SaSizeT sectionDataSent;
+ SaSizeT newSectionSize;
+ int proc_index;
+
+
+ for (checkpoint_list = checkpoint_list_head.next;
+ checkpoint_list != &checkpoint_list_head;
+ checkpoint_list = checkpoint_list->next) {
+
+ checkpoint = list_entry (checkpoint_list,
+ struct saCkptCheckpoint, list);
+
+ /*
+ * Save off the elements in the new list
+ */
+ savedCheckpoint =
+ (struct saCkptCheckpoint *) malloc (sizeof(struct saCkptCheckpoint));
+ assert(savedCheckpoint);
+ memcpy(savedCheckpoint, checkpoint, sizeof(struct saCkptCheckpoint));
+ list_init(&savedCheckpoint->list);
+ list_add(&savedCheckpoint->list,&checkpoint_recovery_list_head);
+
+ for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next;
+ checkpoint_section_list != &checkpoint->checkpointSectionsListHead;
+ checkpoint_section_list = checkpoint_section_list->next) {
+
+ ckptCheckpointSection = list_entry (checkpoint_section_list,
+ struct saCkptCheckpointSection, list);
+
+ proc_index = findProcessorIndex(&this_ip.sin_addr,savedCheckpoint->ckpt_refcount);
+
+ if (proc_index == -1 ) {
+ log_printf (LOG_LEVEL_ERROR,
+ "CKPT: Could not find the checkpoint entry for this processor %p \n",
+ checkpoint);
+ continue;
+ }
+
+ /*
+ * Create and save a new Sync message.
+ */
+ sync_msg =
+ (struct checkpoint_sync *)malloc(sizeof(struct checkpoint_sync));
+ assert(sync_msg);
+
+ sync_msg->type = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE;
+ sync_msg->base.request_state.header.size = sizeof (struct req_exec_ckpt_synchronize_state);
+ sync_msg->base.request_state.header.id = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE;
+ memcpy(&sync_msg->base.request_state.previous_ring_id, &saved_ring_id, sizeof(struct memb_ring_id));
+ memcpy(&sync_msg->base.request_state.checkpointName, &savedCheckpoint->name, sizeof(SaNameT));
+ memcpy(&sync_msg->base.request_state.checkpointCreationAttributes,
+ &savedCheckpoint->checkpointCreationAttributes,
+ sizeof(SaCkptCheckpointCreationAttributesT));
+ memcpy(&sync_msg->base.request_state.sectionDescriptor,
+ &ckptCheckpointSection->sectionDescriptor,
+ sizeof(SaCkptSectionDescriptorT));
+ memcpy(&sync_msg->base.request_state.source_addr, &this_ip.sin_addr, sizeof(struct in_addr));
+ memcpy(&sync_msg->base.request_state.ref_count,
+ &savedCheckpoint->ckpt_refcount[proc_index].count,
+ sizeof(SaUint32T));
+
+ list_init(&sync_msg->list);
+ list_add(&sync_msg->list,&recovery_sync_list_head);
+
+ origSectionSize = ckptCheckpointSection->sectionDescriptor.sectionSize;
+ sectionDataSent = 0;
+ newSectionSize = 0;
+
+ /*
+ * Now Create SyncSection messsages in chunks of CKPT_MAX_SECTION_DATA_SEND or less
+ */
+ while (sectionDataSent < origSectionSize) {
+ /*
+ * Send a Max of CKPT_MAX_SECTION_DATA_SEND of section data
+ */
+ if ((origSectionSize - sectionDataSent) > CKPT_MAX_SECTION_DATA_SEND) {
+ newSectionSize = CKPT_MAX_SECTION_DATA_SEND;
+ }
+ else {
+ newSectionSize = (origSectionSize - sectionDataSent);
+ }
+
+ /*
+ * Create and save a new Sync Section message.
+ */
+ sync_section_msg =
+ (struct checkpoint_sync *)malloc(sizeof(struct checkpoint_sync) + newSectionSize);
+ assert(sync_section_msg);
+
+ sync_section_msg->type = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION;
+ sync_section_msg->base.request_section.header.size = sizeof (struct req_exec_ckpt_synchronize_section);
+ sync_section_msg->base.request_section.header.id = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION;
+ memcpy (&sync_section_msg->base.request_section.checkpointName, &savedCheckpoint->name, sizeof(SaNameT));
+ memcpy (&sync_section_msg->base.request_section.sectionId,
+ &ckptCheckpointSection->sectionDescriptor.sectionId,
+ sizeof(SaCkptSectionIdT));
+ memcpy (&sync_section_msg->base.request_section.dataOffSet, §ionDataSent, sizeof(SaUint32T));
+ memcpy (&sync_section_msg->base.request_section.dataSize, &newSectionSize, sizeof(SaUint32T));
+ memcpy ((char*)sync_section_msg + sizeof(struct checkpoint_sync),
+ ((char*)ckptCheckpointSection->sectionData + sectionDataSent),
+ newSectionSize);
+
+ sectionDataSent += newSectionSize;
+ list_init(&sync_section_msg->list);
+ list_add(&sync_section_msg->list,&recovery_sync_list_head);
+ }
+
+ }
+ }
+#ifdef TODO
+ int res = totempg_token_callback_create(&tok_call_handle,
+ TOTEMPG_CALLBACK_TOKEN_SENT,
+ 0,
+ ckpt_recovery_process,
+ NULL);
+#endif
+}
+
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *data ) {
+
+ struct req_exec_ckpt_synchronize_state *request_exec_sync_state;
+ struct req_exec_ckpt_synchronize_section *request_exec_sync_section;
+ struct iovec iovecs[2];
+ struct list_head *sync_list;
+ struct checkpoint_sync *sync_element;
+
+ /*
+ * Check for empty list here
+ */
+ if (list_empty(&recovery_sync_list_head)) {
+ return (0);
+ }
+
+ while (1) { /*Go for as long as the oubound queue is not full*/
+ /*
+ * Extract the element
+ */
+ sync_list = recovery_sync_list_head.next;
+ sync_element = list_entry (sync_list,
+ struct checkpoint_sync, list);
+
+ log_printf (LOG_LEVEL_DEBUG, "callback recover_process.\n");
+ if (sync_element->type == MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE) {
+ /*
+ * Populate the Sync State Request
+ */
+ request_exec_sync_state = &sync_element->base.request_state;
+ iovecs[0].iov_base = (char *)&request_exec_sync_state;
+ iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_state);
+ /*
+ * Check to see if we can queue the new message and if you can
+ * then mcast the message else break and create callback.
+ */
+ if (totempg_send_ok(iovecs[0].iov_len)){
+ assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
+ log_printf (LOG_LEVEL_DEBUG, "CKPT: Multicasted Sync State Message.\n");
+ }
+ else {
+ log_printf (LOG_LEVEL_DEBUG, "CKPT: Outbound Queue full need to Create Callback.\n");
+ break;
+ }
+ }
+
+ else {
+ /*
+ * Populate the Sync Section Request
+ */
+ request_exec_sync_section = &sync_element->base.request_section;
+ iovecs[0].iov_base = (char *)&request_exec_sync_section;
+ iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_section);
+ /*
+ * Populate the Section Data.
+ */
+ iovecs[1].iov_base = (char *)&sync_element
+ + sizeof (struct checkpoint_sync);
+ iovecs[1].iov_len = request_exec_sync_section->dataSize;
+ /*
+ * Check to see if we can queue the new message and if you can
+ * then mcast the message else break and create callback.
+ */
+ if (totempg_send_ok(iovecs[0].iov_len + iovecs[1].iov_len)){
+ assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
+ log_printf (LOG_LEVEL_DEBUG, "CKPT: Multicasted Sync Section Message.\n");
+ }
+ else {
+ log_printf (LOG_LEVEL_DEBUG, "CKPT: Outbound Queue full need to Create Callback.\n");
+ break;
+ }
+ }
+ list_del(&sync_element->list);
+ free(sync_element);
+
+ /*
+ * Check for empty list here
+ */
+ if (list_empty(&recovery_sync_list_head)) {
+ ckpt_recovery_finalize();
+ return (0);
+ }
+ }
+#ifdef TODO
+ /*
+ * We have more to send ...
+ */
+ int res = totempg_token_callback_create(&tok_call_handle,
+ TOTEMPG_CALLBACK_TOKEN_SENT,
+ 0,
+ ckpt_recovery_process,
+ NULL);
+#endif
+}
+
+static void ckpt_recovery_finalize () {
+ struct list_head *checkpoint_list;
+ struct saCkptCheckpoint *checkpoint;
+
+ /*
+ * Remove All elements from old checkpoint
+ * list
+ */
+ checkpoint_list = checkpoint_list_head.next;
+ while (!list_empty(&checkpoint_list_head)) {
+ checkpoint = list_entry (checkpoint_list,
+ struct saCkptCheckpoint, list);
+ list_del(&checkpoint->list);
+ free(checkpoint);
+ checkpoint_list = checkpoint_list_head.next;
+ }
+
+ /*
+ * Initialize the old list again.
+ */
+ list_init(&checkpoint_list_head);
+
+ /*
+ * Copy the contents of the new list_head into the old list head
+ */
+ memcpy(&checkpoint_list_head, &checkpoint_recovery_list_head, sizeof(struct list_head));
+
+ /*
+ * Initialize the new list head for reuse.
+ */
+ list_init(&checkpoint_recovery_list_head);
+
+}
+
+static int ckpt_recovery_abort () {
+/*
+ * TODO add some code here to abort the recovery process
+ */
+ return (0);
+}
+
+static void ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries) {
+ struct list_head *checkpoint_list;
+ struct saCkptCheckpoint *checkpoint;
+ struct in_addr *member;
+ int index;
+ int i;
+
+ if (left_list_entries == 0) {
+ return;
+ }
+
+ /*
+ * Iterate left_list_entries.
+ */
+ member = left_list;
+ for (i = 0; i < left_list_entries; i++) {
+ for (checkpoint_list = checkpoint_list_head.next;
+ checkpoint_list != &checkpoint_list_head;
+ checkpoint_list = checkpoint_list->next) {
+
+ checkpoint = list_entry (checkpoint_list,
+ struct saCkptCheckpoint, list);
+ index = findProcessorIndex(member, checkpoint->ckpt_refcount);
+ if (index == -1) {
+ continue;
+ }
+ /*
+ * Decrement
+ *
+ */
+ if (checkpoint->referenceCount > 0) {
+ checkpoint->referenceCount -= checkpoint->ckpt_refcount[index].count;
+ } else {
+ /*TODO Log Error here*/
+ }
+ checkpoint->ckpt_refcount[index].count = 0;
+ memset((char*)&checkpoint->ckpt_refcount[index].addr, 0, sizeof(struct in_addr));
+ }
+ member++;
+ }
+ return;
+}
+
+static int ckpt_confchg_fn (
+ enum totempg_configuration_type configuration_type,
+ struct in_addr *member_list, void *member_list_private,
+ int member_list_entries,
+ struct in_addr *left_list, void *left_list_private,
+ int left_list_entries,
+ struct in_addr *joined_list, void *joined_list_private,
+ int joined_list_entries,
+ struct memb_ring_id *ring_id) {
+
+ if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
+#ifdef TODO
+ totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);
+#endif
+ memcpy (&saved_ring_id, ring_id, sizeof(struct memb_ring_id));
+ }
+
+ else if (configuration_type == TOTEMPG_CONFIGURATION_TRANSITIONAL) {
+ ckpt_recovery_process_members_exit(left_list, left_list_entries);
+ ckpt_recovery_inititialize ();
+ }
+
+ return (0);
+}
static struct saCkptCheckpoint *ckpt_checkpoint_find_global (SaNameT *name)
{
- struct list_head *checkpointList;
+ struct list_head *checkpoint_list;
struct saCkptCheckpoint *checkpoint;
- for (checkpointList = checkpointListHead.next;
- checkpointList != &checkpointListHead;
- checkpointList = checkpointList->next) {
+ for (checkpoint_list = checkpoint_list_head.next;
+ checkpoint_list != &checkpoint_list_head;
+ checkpoint_list = checkpoint_list->next) {
- checkpoint = list_entry (checkpointList,
+ checkpoint = list_entry (checkpoint_list,
struct saCkptCheckpoint, list);
if (name_match (name, &checkpoint->name)) {
@@ -320,15 +723,15 @@
char *id,
int idLen)
{
- struct list_head *checkpointSectionList;
+ struct list_head *checkpoint_section_list;
struct saCkptCheckpointSection *ckptCheckpointSection;
log_printf (LOG_LEVEL_DEBUG, "Finding checkpoint section id %s %d\n", id, idLen);
- for (checkpointSectionList = ckptCheckpoint->checkpointSectionsListHead.next;
- checkpointSectionList != &ckptCheckpoint->checkpointSectionsListHead;
- checkpointSectionList = checkpointSectionList->next) {
+ for (checkpoint_section_list = ckptCheckpoint->checkpointSectionsListHead.next;
+ checkpoint_section_list != &ckptCheckpoint->checkpointSectionsListHead;
+ checkpoint_section_list = checkpoint_section_list->next) {
- ckptCheckpointSection = list_entry (checkpointSectionList,
+ ckptCheckpointSection = list_entry (checkpoint_section_list,
struct saCkptCheckpointSection, list);
log_printf (LOG_LEVEL_DEBUG, "Checking section id %*s\n",
@@ -404,9 +807,12 @@
static int ckpt_exec_init_fn (void)
{
- // Initialize the saved ring ID.
+ /*
+ * Initialize the saved ring ID.
+ */
saved_ring_id.seq = 0;
- saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;
+ saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;
+
#ifdef TODO
int res;
res = totempg_recovery_plug_create (&ckpt_checkpoint_recovery_plug_handle);
@@ -445,9 +851,11 @@
}
#ifdef TODO
-/* todo close section iterators
+/* TODO close section iterators
+ */
+/*
+ * TODO what about exit of open checkpoints
*/
-// TODO what about exit of open checkpoints
if (conn_info->ais_ci.u.libckpt_ci.sectionIterator.sectionIteratorEntries) {
free (conn_info->ais_ci.u.libckpt_ci.sectionIterator.sectionIteratorEntries);
@@ -511,10 +919,11 @@
ckptCheckpoint->unlinked = 0;
list_init (&ckptCheckpoint->list);
list_init (&ckptCheckpoint->checkpointSectionsListHead);
- list_add (&ckptCheckpoint->list, &checkpointListHead);
+ list_add (&ckptCheckpoint->list, &checkpoint_list_head);
ckptCheckpoint->referenceCount = 0;
ckptCheckpoint->retention_timer = 0;
ckptCheckpoint->expired = 0;
+ initialize_ckpt_refcount_array(ckptCheckpoint->ckpt_refcount);
/*
* Add in default checkpoint section
@@ -530,7 +939,7 @@
ckptCheckpointSection->sectionDescriptor.sectionSize = 0;
ckptCheckpointSection->sectionDescriptor.expirationTime = SA_TIME_END;
ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;
- ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; // current time
+ ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; /*current time*/
ckptCheckpointSection->sectionData = 0;
ckptCheckpointSection->expiration_timer = 0;
}
@@ -548,7 +957,26 @@
*/
log_printf (LOG_LEVEL_DEBUG, "CHECKPOINT opened is %p\n", ckptCheckpoint);
ckptCheckpoint->referenceCount += 1;
-
+
+ /*
+ * Add the connection reference information to the Checkpoint to be
+ * sent out later as a part of the sync process.
+ *
+ */
+
+ int proc_index = findProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+ if (proc_index == -1) {/* Could not find, lets set the processor to an index.*/
+ proc_index = setProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+ }
+ if (proc_index != -1 ) {
+ ckptCheckpoint->ckpt_refcount[proc_index].addr = source_addr;
+ ckptCheckpoint->ckpt_refcount[proc_index].count++;
+ }
+ else {
+ log_printf (LOG_LEVEL_ERROR,
+ "CKPT: MAX LIMIT OF PROCESSORS reached. Cannot store new proc %p info.\n",
+ ckptCheckpoint);
+ }
/*
* Reset retention duration since this checkpoint was just opened
*/
@@ -581,10 +1009,62 @@
sizeof (struct res_lib_ckpt_checkpointopen));
}
-// return (error == SA_AIS_OK ? 0 : -1);
+/* return (error == SA_AIS_OK ? 0 : -1); */
+ return (0);
+}
+
+/**/
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required) {
+ int retcode;
+ struct req_exec_ckpt_checkpointopen request_open_exec;
+ struct req_lib_ckpt_checkpointopen request_open_lib;
+ struct req_exec_ckpt_synchronize_state *req_exec_ckpt_sync_state
+ = (struct req_exec_ckpt_synchronize_state *)message;
+
+ /*
+ * If the Incoming message's previous ring id == saved_ring_id
+ * Ignore because we have seen this message before.
+ */
+ if (memcmp (&req_exec_ckpt_sync_state->previous_ring_id, &saved_ring_id,sizeof (struct memb_ring_id)) != 0) {
+ return(0);
+ }
+ request_open_lib.checkpointName = req_exec_ckpt_sync_state->checkpointName;
+ request_open_lib.checkpointCreationAttributes = req_exec_ckpt_sync_state->checkpointCreationAttributes;
+ request_open_exec.req_lib_ckpt_checkpointopen = request_open_lib;
+ message_handler_req_exec_ckpt_checkpointopen(&request_open_exec, req_exec_ckpt_sync_state->source_addr, 0);
+
+ retcode = recovery_section_create (&req_exec_ckpt_sync_state->sectionDescriptor,
+ &req_exec_ckpt_sync_state->checkpointName);
+ if (retcode != SA_AIS_OK) {
+ log_printf(LOG_LEVEL_ERROR, "CKPT: message_handler_req_exec_ckpt_synchronize_state\n");
+ log_printf(LOG_LEVEL_ERROR, "CKPT: recovery_section_create returned %d\n",retcode);
+ }
+
return (0);
}
+static int message_handler_req_exec_ckpt_synchronize_section (void *message, struct in_addr source_addr, int endian_conversion_required) {
+ int retcode;
+ struct req_exec_ckpt_synchronize_section *req_exec_ckpt_sync_section
+ = (struct req_exec_ckpt_synchronize_section *)message;
+ /*
+ * Write the contents of the section to the checkpoint section.
+ */
+ retcode = recovery_section_write(&req_exec_ckpt_sync_section->sectionId,
+ &req_exec_ckpt_sync_section->checkpointName,
+ (char*)req_exec_ckpt_sync_section
+ + sizeof (struct req_exec_ckpt_synchronize_section),
+ req_exec_ckpt_sync_section->dataOffSet,
+ req_exec_ckpt_sync_section->dataSize);
+ if (retcode != SA_AIS_OK) {
+ log_printf(LOG_LEVEL_ERROR, "CKPT: message_handler_req_exec_ckpt_synchronize_section\n");
+ log_printf(LOG_LEVEL_ERROR, "CKPT: recovery_section_write returned %d\n",retcode);
+ }
+
+ return (0);
+}
+
+
unsigned int abstime_to_msec (SaTimeT time)
{
struct timeval tv;
@@ -644,6 +1124,20 @@
}
checkpoint->referenceCount--;
+ /*
+ * Modify the connection reference information to the Checkpoint to be
+ * sent out later as a part of the sync process.
+ */
+
+ int proc_index = findProcessorIndex(&source_addr, checkpoint->ckpt_refcount);
+ if (proc_index != -1 ) {
+ checkpoint->ckpt_refcount[proc_index].count--;
+ }
+ else {
+ log_printf (LOG_LEVEL_ERROR,
+ "CKPT: Could Not find Processor Info %p info.\n",
+ checkpoint);
+ }
assert (checkpoint->referenceCount >= 0);
log_printf (LOG_LEVEL_DEBUG, "disconnect called, new CKPT ref count is %d\n",
checkpoint->referenceCount);
@@ -652,10 +1146,10 @@
* If checkpoint has been unlinked and this is the last reference, delete it
*/
if (checkpoint->unlinked && checkpoint->referenceCount == 0) {
- log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
+ log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
checkpoint_release (checkpoint);
} else
- if (checkpoint->referenceCount == 0) {
+ if (checkpoint->referenceCount == 0) {
poll_timer_add (aisexec_poll_handle,
checkpoint->checkpointCreationAttributes.retentionDuration / 1000000,
checkpoint,
@@ -791,6 +1285,96 @@
return (0);
}
+static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor,
+ SaNameT *checkpointName) {
+ struct saCkptCheckpoint *ckptCheckpoint;
+ struct saCkptCheckpointSection *ckptCheckpointSection;
+ void *initialData;
+ void *sectionId;
+ SaErrorT error = SA_AIS_OK;
+
+ ckptCheckpoint = ckpt_checkpoint_find_global (checkpointName);
+ if (ckptCheckpoint == 0) {
+ error = SA_AIS_ERR_NOT_EXIST;
+ goto error_exit;
+ }
+
+ /*
+ * Determine if user-specified checkpoint ID already exists
+ */
+ ckptCheckpointSection = ckpt_checkpoint_find_globalSection (ckptCheckpoint,
+ ((char *)sectionDescriptor->sectionId.id),
+ (int)sectionDescriptor->sectionId.idLen);
+ if (ckptCheckpointSection) {
+ error = SA_AIS_ERR_EXIST;
+ goto error_exit;
+ }
+
+ /*
+ * Allocate checkpoint section
+ */
+ ckptCheckpointSection = malloc (sizeof (struct saCkptCheckpointSection));
+ if (ckptCheckpointSection == 0) {
+ error = SA_AIS_ERR_NO_MEMORY;
+ goto error_exit;
+ }
+ /*
+ * Allocate checkpoint section data
+ */
+ initialData = malloc (sectionDescriptor->sectionSize);
+ if (initialData == 0) {
+ free (ckptCheckpointSection);
+ error = SA_AIS_ERR_NO_MEMORY;
+ goto error_exit;
+ }
+ /*
+ * Allocate checkpoint section id
+ */
+ sectionId = malloc ((int)sectionDescriptor->sectionId.idLen);
+ if (sectionId == 0) {
+ free (ckptCheckpointSection);
+ free (initialData);
+ error = SA_AIS_ERR_NO_MEMORY;
+ goto error_exit;
+ }
+ /*
+ * Copy checkpoint section ID and initialize data.
+ */
+ memcpy (sectionId, ((char *)sectionDescriptor->sectionId.id),
+ (int)sectionDescriptor->sectionId.idLen);
+
+ memset (initialData, 0, sectionDescriptor->sectionSize);
+
+ /*
+ * Configure checkpoint section
+ */
+ memcpy(&ckptCheckpointSection->sectionDescriptor,
+ sectionDescriptor,
+ sizeof(SaCkptSectionDescriptorT));
+ ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;
+ ckptCheckpointSection->sectionData = initialData;
+ ckptCheckpointSection->expiration_timer = 0;
+
+ if (sectionDescriptor->expirationTime != SA_TIME_END) {
+ poll_timer_add (aisexec_poll_handle,
+ abstime_to_msec (ckptCheckpointSection->sectionDescriptor.expirationTime),
+ ckptCheckpointSection,
+ timer_function_section_expire,
+ &ckptCheckpointSection->expiration_timer);
+ }
+
+ /*
+ * Add checkpoint section to checkpoint
+ */
+ list_init (&ckptCheckpointSection->list);
+ list_add (&ckptCheckpointSection->list,
+ &ckptCheckpoint->checkpointSectionsListHead);
+
+error_exit:
+ return (error);
+
+}
+
static int message_handler_req_exec_ckpt_sectioncreate (void *message, struct in_addr source_addr, int endian_conversion_required) {
struct req_exec_ckpt_sectioncreate *req_exec_ckpt_sectioncreate = (struct req_exec_ckpt_sectioncreate *)message;
struct req_lib_ckpt_sectioncreate *req_lib_ckpt_sectioncreate = (struct req_lib_ckpt_sectioncreate *)&req_exec_ckpt_sectioncreate->req_lib_ckpt_sectioncreate;
@@ -804,7 +1388,7 @@
log_printf (LOG_LEVEL_DEBUG, "Executive request to create a checkpoint section.\n");
ckptCheckpoint = ckpt_checkpoint_find_global (&req_exec_ckpt_sectioncreate->checkpointName);
if (ckptCheckpoint == 0) {
- error = SA_AIS_ERR_LIBRARY; // TODO find the right error for this
+ error = SA_AIS_ERR_LIBRARY; /* TODO find the right error for this*/
goto error_exit;
}
@@ -867,7 +1451,7 @@
ckptCheckpointSection->sectionDescriptor.sectionSize = req_lib_ckpt_sectioncreate->initialDataSize;
ckptCheckpointSection->sectionDescriptor.expirationTime = req_lib_ckpt_sectioncreate->expirationTime;
ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;
- ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; // TODO current time
+ ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; /* TODO current time */
ckptCheckpointSection->sectionData = initialData;
ckptCheckpointSection->expiration_timer = 0;
@@ -1015,6 +1599,58 @@
return (0);
}
+static int recovery_section_write(SaCkptSectionIdT *sectionId,
+ SaNameT *checkpointName,
+ void *newData,
+ SaUint32T dataOffSet,
+ SaUint32T dataSize) {
+ struct saCkptCheckpoint *ckptCheckpoint;
+ struct saCkptCheckpointSection *ckptCheckpointSection;
+ int sizeRequired;
+ SaErrorT error = SA_AIS_OK;
+ char *sd;
+
+ log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery_section_write.\n");
+ ckptCheckpoint = ckpt_checkpoint_find_global (checkpointName);
+ if (ckptCheckpoint == 0) {
+ error = SA_AIS_ERR_NOT_EXIST;
+ goto error_exit;
+ }
+
+ /*
+ * Find checkpoint section to be written
+ */
+ ckptCheckpointSection = ckpt_checkpoint_find_globalSection (ckptCheckpoint,
+ ((char *)sectionId->id),
+ (int)sectionId->idLen);
+ if (ckptCheckpointSection == 0) {
+ error = SA_AIS_ERR_NOT_EXIST;
+ goto error_exit;
+ }
+
+ /*
+ * If write would extend past end of section data, return error;
+ */
+ sizeRequired = dataOffSet + dataSize;
+ if (sizeRequired > ckptCheckpointSection->sectionDescriptor.sectionSize) {
+ error = SA_AIS_ERR_ACCESS;
+ goto error_exit;
+ }
+
+ /*
+ * Write checkpoint section to section data
+ */
+ if (dataSize > 0) {
+ sd = (char *)ckptCheckpointSection->sectionData;
+ memcpy (&sd[dataOffSet],
+ newData,
+ dataSize);
+ }
+error_exit:
+ return (error);
+}
+
+
static int message_handler_req_exec_ckpt_sectionwrite (void *message, struct in_addr source_addr, int endian_conversion_required) {
struct req_exec_ckpt_sectionwrite *req_exec_ckpt_sectionwrite = (struct req_exec_ckpt_sectionwrite *)message;
struct req_lib_ckpt_sectionwrite *req_lib_ckpt_sectionwrite = (struct req_lib_ckpt_sectionwrite *)&req_exec_ckpt_sectionwrite->req_lib_ckpt_sectionwrite;
@@ -1032,7 +1668,9 @@
goto error_exit;
}
-//printf ("writing checkpoint section is %s\n", ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite));
+/*
+ printf ("writing checkpoint section is %s\n", ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite));
+*/
/*
* Find checkpoint section to be written
*/
@@ -1147,7 +1785,7 @@
*/
ckptCheckpointSection->sectionDescriptor.sectionSize = req_lib_ckpt_sectionoverwrite->dataSize;
ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;
- ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; // TODO current time
+ ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; /* TODO current time */
ckptCheckpointSection->sectionData = sectionData;
/*
@@ -1178,7 +1816,7 @@
ckptCheckpoint = ckpt_checkpoint_find_global (&req_exec_ckpt_sectionread->checkpointName);
if (ckptCheckpoint == 0) {
- error = SA_AIS_ERR_LIBRARY; // TODO find the right error for this
+ error = SA_AIS_ERR_LIBRARY; /* TODO find the right error for this */
goto error_exit;
}
@@ -1258,7 +1896,7 @@
conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorCount = 0;
conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorPos = 0;
list_add (&conn_info->ais_ci.u.libckpt_ci.sectionIterator.list,
- &checkpointIteratorListHead);
+ &checkpoint_iterator_list_head);
list_init (&conn_info->ais_ci.u.libckpt_ci.checkpoint_list);
error = SA_AIS_OK;
}
@@ -1400,7 +2038,7 @@
struct saCkptCheckpoint *checkpoint;
int memoryUsed = 0;
int numberOfSections = 0;
- struct list_head *checkpointSectionList;
+ struct list_head *checkpoint_section_list;
struct saCkptCheckpointSection *checkpointSection;
log_printf (LOG_LEVEL_DEBUG, "in status get\n");
@@ -1410,11 +2048,11 @@
*/
checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_checkpointstatusget->checkpointName);
- for (checkpointSectionList = checkpoint->checkpointSectionsListHead.next;
- checkpointSectionList != &checkpoint->checkpointSectionsListHead;
- checkpointSectionList = checkpointSectionList->next) {
+ for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next;
+ checkpoint_section_list != &checkpoint->checkpointSectionsListHead;
+ checkpoint_section_list = checkpoint_section_list->next) {
- checkpointSection = list_entry (checkpointSectionList,
+ checkpointSection = list_entry (checkpoint_section_list,
struct saCkptCheckpointSection, list);
memoryUsed += checkpointSection->sectionDescriptor.sectionSize;
@@ -1614,7 +2252,9 @@
iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite);
iovecs[1].iov_len = req_lib_ckpt_sectionwrite->header.size - sizeof (struct req_lib_ckpt_sectionwrite);
-//printf ("LIB writing checkpoint section is %s\n", ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite));
+/*
+ printf ("LIB writing checkpoint section is %s\n", ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite));
+*/
if (iovecs[1].iov_len > 0) {
assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
} else {
@@ -1730,7 +2370,7 @@
struct saCkptCheckpointSection *ckptCheckpointSection;
struct saCkptSectionIteratorEntry *ckptSectionIteratorEntries;
struct saCkptSectionIterator *ckptSectionIterator;
- struct list_head *checkpointSectionList;
+ struct list_head *checkpoint_section_list;
int addEntry = 0;
int iteratorEntries = 0;
SaErrorT error = SA_AIS_OK;
@@ -1747,11 +2387,11 @@
/*
* Iterate list of checkpoint sections
*/
- for (checkpointSectionList = ckptCheckpoint->checkpointSectionsListHead.next;
- checkpointSectionList != &ckptCheckpoint->checkpointSectionsListHead;
- checkpointSectionList = checkpointSectionList->next) {
+ for (checkpoint_section_list = ckptCheckpoint->checkpointSectionsListHead.next;
+ checkpoint_section_list != &ckptCheckpoint->checkpointSectionsListHead;
+ checkpoint_section_list = checkpoint_section_list->next) {
- ckptCheckpointSection = list_entry (checkpointSectionList,
+ ckptCheckpointSection = list_entry (checkpoint_section_list,
struct saCkptCheckpointSection, list);
addEntry = 1;
diff -uNr --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet --exclude=init --exclude=LICENSE --exclude=Makefile --exclude=man --exclude=README.devmap --exclude=SECURITY --exclude=TODO --exclude=CHANGELOG --exclude=conf --exclude=loc --exclude=Makefile.samples --exclude=QUICKSTART --exclude=test --exclude=.cdtproject --exclude=.project ../latest/include/ipc_ckpt.h ../bk_openais/include/ipc_ckpt.h
--- ../latest/include/ipc_ckpt.h 2005-02-25 14:07:12.000000000 -0600
+++ ../bk_openais/include/ipc_ckpt.h 2005-02-24 14:55:42.000000000 -0600
@@ -37,6 +37,8 @@
#include "../include/ipc_gen.h"
#include "../include/ais_types.h"
#include "../include/saCkpt.h"
+#include "../exec/totemsrp.h"
+#include "../exec/ckpt.h"
enum req_lib_ckpt_checkpoint_types {
MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN = 1,
@@ -326,4 +328,24 @@
struct res_header header;
};
+struct req_exec_ckpt_synchronize_state {
+ struct req_header header;
+ struct memb_ring_id previous_ring_id;
+ SaNameT checkpointName;
+ SaCkptCheckpointCreationAttributesT checkpointCreationAttributes;
+ SaCkptSectionDescriptorT sectionDescriptor;
+ struct in_addr source_addr;
+ SaUint32T ref_count;
+};
+
+struct req_exec_ckpt_synchronize_section {
+ struct req_header header;
+ SaNameT checkpointName;
+ SaCkptSectionIdT sectionId;
+ SaUint32T dataOffSet;
+ SaUint32T dataSize;
+};
+
+
+
#endif /* IPC_CKPT_H_DEFINED */
diff -uNr --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet --exclude=init --exclude=LICENSE --exclude=Makefile --exclude=man --exclude=README.devmap --exclude=SECURITY --exclude=TODO --exclude=CHANGELOG --exclude=conf --exclude=loc --exclude=Makefile.samples --exclude=QUICKSTART --exclude=test --exclude=.cdtproject --exclude=.project ../latest/include/ipc_gen.h ../bk_openais/include/ipc_gen.h
--- ../latest/include/ipc_gen.h 2005-02-25 14:07:12.000000000 -0600
+++ ../bk_openais/include/ipc_gen.h 2005-02-24 13:42:21.000000000 -0600
@@ -69,6 +69,8 @@
MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE,
MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE,
MESSAGE_REQ_EXEC_CKPT_SECTIONREAD,
+ MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE,
+ MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION,
MESSAGE_REQ_EXEC_EVT_EVENTDATA,
MESSAGE_REQ_EXEC_EVT_CHANCMD,
MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA
More information about the Openais
mailing list