[Openais] Patch II design review !!
Muni Bajpai
muniba at nortel.com
Mon Feb 21 11:10:31 PST 2005
Skipped content of type multipart/alternative-------------- next part --------------
--- ../latest/exec/ckpt.c 2005-02-20 18:08:13.000000000 -0600
+++ ../bk_openais/exec/ckpt.c 2005-02-21 12:14:02.000000000 -0600
@@ -63,11 +63,22 @@
DECLARE_LIST_INIT(checkpointIteratorListHead);
+DECLARE_LIST_INIT(recoverySyncListHead);
+
+DECLARE_LIST_INIT(checkpointRecoveryListHead);
+
struct checkpoint_cleanup {
struct list_head list;
struct saCkptCheckpoint *checkpoint;
};
+struct checkpoint_sync {
+ struct list_head list;
+ struct req_exec_ckpt_synchronize_state request;
+};
+
+static void *tok_call_handle = 0;
+
//TODO static totempg_recovery_plug_handle ckpt_checkpoint_recovery_plug_handle;
static int ckpt_exec_init_fn (void);
@@ -78,6 +89,8 @@
static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct in_addr source_addr, int endian_conversion_required);
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required);
+
static int message_handler_req_exec_ckpt_checkpointclose (void *message, struct in_addr source_addr, int endian_conversion_required);
static int message_handler_req_exec_ckpt_checkpointunlink (void *message, struct in_addr source_addr, int endian_conversion_required);
@@ -133,24 +146,24 @@
static int message_handler_req_lib_ckpt_sectioniteratorinitialize (struct conn_info *conn_info, void *message);
static int message_handler_req_lib_ckpt_sectioniteratornext (struct conn_info *conn_info, void *message);
-static int ckpt_confchg_fn (
- enum totempg_configuration_type configuration_type,
- struct in_addr *member_list, void *member_list_private,
- int member_list_entries,
- struct in_addr *left_list, void *left_list_private,
- int left_list_entries,
- struct in_addr *joined_list, void *joined_list_private,
- int joined_list_entries,
- struct memb_ring_id *ring_id) {
+/*RECOVERY_MUNI*/
+static void ckpt_recovery_inititialize ();
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *);
+static int ckpt_recovery_finalize();
+static int ckpt_recovery_abort();
+static int ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries);
-#ifdef TODO
- if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
- totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);
- }
-#endif
+static struct memb_ring_id saved_ring_id;
- return (0);
-}
+static int ckpt_confchg_fn(
+ enum totempg_configuration_type configuration_type,
+ struct in_addr *member_list, void *member_list_private,
+ int member_list_entries,
+ struct in_addr *left_list, void *left_list_private,
+ int left_list_entries,
+ struct in_addr *joined_list, void *joined_list_private,
+ int joined_list_entries,
+ struct memb_ring_id *ring_id);
struct libais_handler ckpt_libais_handlers[] =
{
@@ -258,7 +271,8 @@
message_handler_req_exec_ckpt_sectionexpirationtimeset,
message_handler_req_exec_ckpt_sectionwrite,
message_handler_req_exec_ckpt_sectionoverwrite,
- message_handler_req_exec_ckpt_sectionread
+ message_handler_req_exec_ckpt_sectionread,
+ message_handler_req_exec_ckpt_synchronize_state
};
struct service_handler ckpt_service_handler = {
@@ -273,7 +287,253 @@
.exec_dump_fn = 0
};
-static struct memb_ring_id saved_ring_id;
+static int setProcessorIndex(struct in_addr *proc_addr,
+ struct ckpt_refcnt ckpt_refcount[]) { //RECOVERY_MUNI
+ int i;
+ for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+ /*
+ * If the source addresses match then this processor index
+ * has already been set
+ */
+ if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+ return -1;
+ }
+ /*
+ * If the source addresses do not match and this element
+ * has no stored value then store the new value and
+ * return the Index.
+ */
+ else if (ckpt_refcount[i].addr.s_addr == 0) {
+ memcpy(&ckpt_refcount[i].addr, proc_addr, sizeof(struct in_addr));
+ return i;
+ }
+ }
+ /*
+ * Could not Find an empty slot
+ * to store the new Processor.
+ */
+ return -1;
+}
+
+static int findProcessorIndex(struct in_addr *proc_addr,
+ struct ckpt_refcnt ckpt_refcount[]) { //RECOVERY_MUNI
+ int i;
+ for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+ /*
+ * If the source addresses match then return the index
+ */
+ if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+ return i;
+ }
+ }
+ /*
+ * Could not Find the Processor
+ */
+ return -1;
+}
+
+static void initialize_ckpt_refcount_array (struct ckpt_refcnt ckpt_refcount[]) {
+ memset((char*)&ckpt_refcount[0], 0, PROCESSOR_COUNT_MAX * sizeof(struct ckpt_refcnt));
+}
+
+static void ckpt_recovery_inititialize () {
+ struct list_head *checkpointList;
+ struct saCkptCheckpoint *checkpoint;
+ struct checkpoint_sync *sync_msg;
+ struct saCkptCheckpoint *savedCheckpoint;
+ struct list_head *checkpointSectionList;
+ struct saCkptCheckpointSection *ckptCheckpointSection;
+
+
+ for (checkpointList = checkpointListHead.next;
+ checkpointList != &checkpointListHead;
+ checkpointList = checkpointList->next) {
+
+ checkpoint = list_entry (checkpointList,
+ struct saCkptCheckpoint, list);
+
+ /*Save off the elements in the new list*/
+ savedCheckpoint =
+ (struct saCkptCheckpoint *) malloc (sizeof(struct saCkptCheckpoint));
+ assert(savedCheckpoint);
+ memcpy(savedCheckpoint, checkpoint, sizeof(struct saCkptCheckpoint));
+ list_init(&savedCheckpoint->list);
+ list_add(&savedCheckpoint->list,&checkpointRecoveryListHead);
+
+ for (checkpointSectionList = checkpoint->checkpointSectionsListHead.next;
+ checkpointSectionList != &checkpoint->checkpointSectionsListHead;
+ checkpointSectionList = checkpointSectionList->next) {
+
+ ckptCheckpointSection = list_entry (checkpointSectionList,
+ struct saCkptCheckpointSection, list);
+ /*Create and save a new Sync message.*/
+ sync_msg =
+ (struct checkpoint_sync *)malloc(sizeof(struct checkpoint_sync));
+ assert(sync_msg);
+ memcpy(&sync_msg->request.checkpointName, &savedCheckpoint->name, sizeof(SaNameT));
+ memcpy(&sync_msg->request.checkpointCreationAttributes,
+ &savedCheckpoint->checkpointCreationAttributes,
+ sizeof(SaCkptCheckpointCreationAttributesT));
+ memcpy(&sync_msg->request.section, ckptCheckpointSection, sizeof(struct saCkptCheckpointSection));
+ sync_msg->request.previous_ring_id = saved_ring_id;
+ sync_msg->request.source_addr = this_ip.sin_addr;
+
+ int proc_index = findProcessorIndex(&this_ip.sin_addr,savedCheckpoint->ckpt_refcount);
+
+ if (proc_index != -1 ) {
+ memcpy(&sync_msg->request.ref_count,
+ &savedCheckpoint->ckpt_refcount[proc_index].count,
+ sizeof(SaUint32T));
+ }
+ else {
+ log_printf (LOG_LEVEL_ERROR,
+ "CKPT: Could not find the checkpoint entry for this processor %p \n",
+ checkpoint);
+ continue;
+ }
+
+ list_init(&sync_msg->list);
+ list_add(&sync_msg->list,&recoverySyncListHead);
+ }
+ }
+#if 0
+ int res = totempg_token_callback_create(&tok_call_handle,
+ TOTEMPG_CALLBACK_TOKEN_RECEIVED,
+ 0,
+ ckpt_recovery_process,
+ NULL);
+#endif
+}
+
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *data ) {
+
+ struct req_exec_ckpt_synchronize_state *request_exec_sync_state;
+ struct iovec iovecs[2];
+ struct list_head *syncList;
+ struct checkpoint_sync *sync_element;
+
+ //Check for empty list here
+ if (list_empty(&recoverySyncListHead)) {
+ return (0);
+ }
+ //Extract the element
+ syncList = recoverySyncListHead.next;
+ sync_element = list_entry (syncList,
+ struct checkpoint_sync, list);
+
+ request_exec_sync_state = &sync_element->request;
+
+ log_printf (LOG_LEVEL_DEBUG, "callback recover_process.\n");
+ request_exec_sync_state->header.size =
+ sizeof (struct req_exec_ckpt_synchronize_state);
+ request_exec_sync_state->header.id = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE;
+
+ iovecs[0].iov_base = (char *)&request_exec_sync_state;
+ iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_state);
+
+ assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
+ list_del(&sync_element->list);
+ free(sync_element);
+
+ //Check for empty list here
+ if (list_empty(&recoverySyncListHead)) {
+ //todo Call finalize here.
+ return (0);
+ }
+#if 0
+ //We have more to send ...
+ int res = totempg_token_callback_create(&tok_call_handle,
+ TOTEMPG_CALLBACK_TOKEN_RECEIVED,
+ 0,
+ ckpt_recovery_process,
+ NULL);
+#endif
+}
+
+static int ckpt_recovery_finalize () {
+ struct list_head *checkpointList;
+ struct saCkptCheckpoint *checkpoint;
+
+ checkpointList = checkpointListHead.next;
+ while (checkpointList != &checkpointListHead) {
+ checkpoint = list_entry (checkpointList,
+ struct saCkptCheckpoint, list);
+ list_del(&checkpoint->list);
+ free(checkpoint);
+ checkpointList = checkpointListHead.next;
+ }
+
+ list_init(&checkpointListHead);
+
+ memcpy(&checkpointListHead, &checkpointRecoveryListHead, sizeof(struct list_head));
+
+ list_init(&checkpointRecoveryListHead);
+
+ return (0);
+}
+
+static int ckpt_recovery_abort () {
+//@TODO add some code here to abort the recovery process
+ return (0);
+}
+
+static int ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries) {
+ struct list_head *checkpointList;
+ struct saCkptCheckpoint *checkpoint;
+ struct in_addr *member;
+
+ if (left_list_entries == 0) {
+ return (0);
+ }
+ // Iterate left_list_entries - 1 times and at least once.
+ int i = 0;
+ do {
+ member = left_list;
+ for (checkpointList = checkpointListHead.next;
+ checkpointList != &checkpointListHead;
+ checkpointList = checkpointList->next) {
+
+ checkpoint = list_entry (checkpointList,
+ struct saCkptCheckpoint, list);
+
+ int index = findProcessorIndex(member, checkpoint->ckpt_refcount);
+
+ if (index == -1) {
+ continue;
+ }
+
+ checkpoint->ckpt_refcount[index].count = 0;
+ //TODO should we decrement the referenceCount also ? or is that done in close ?
+ }
+ i++;
+ left_list++;
+ } while (i < (left_list_entries - 1));
+ return(0);
+}
+
+static int ckpt_confchg_fn (
+ enum totempg_configuration_type configuration_type,
+ struct in_addr *member_list, void *member_list_private,
+ int member_list_entries,
+ struct in_addr *left_list, void *left_list_private,
+ int left_list_entries,
+ struct in_addr *joined_list, void *joined_list_private,
+ int joined_list_entries,
+ struct memb_ring_id *ring_id) {
+
+ if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
+#ifdef TODO
+ totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);
+#endif
+ memcpy (&saved_ring_id, ring_id, sizeof(struct memb_ring_id));//RECOVERY_MUNI
+ }
+
+ else if (configuration_type == TOTEMPG_CONFIGURATION_TRANSITIONAL) {
+ ckpt_recovery_process_members_exit(left_list, left_list_entries);
+ }
+
+ return (0);
+}
static struct saCkptCheckpoint *ckpt_checkpoint_find_global (SaNameT *name)
{
@@ -406,7 +666,8 @@
{
// Initialize the saved ring ID.
saved_ring_id.seq = 0;
- saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;
+ saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;
+
#ifdef TODO
int res;
res = totempg_recovery_plug_create (&ckpt_checkpoint_recovery_plug_handle);
@@ -515,6 +776,7 @@
ckptCheckpoint->referenceCount = 0;
ckptCheckpoint->retention_timer = 0;
ckptCheckpoint->expired = 0;
+ initialize_ckpt_refcount_array(ckptCheckpoint->ckpt_refcount);
/*
* Add in default checkpoint section
@@ -548,7 +810,25 @@
*/
log_printf (LOG_LEVEL_DEBUG, "CHECKPOINT opened is %p\n", ckptCheckpoint);
ckptCheckpoint->referenceCount += 1;
-
+
+ /*
+ * Add the connection reference information to the Checkpoint to be
+ * sent out later as a part of the sync process.
+ * RECOVERY_MUNI
+ */
+ int proc_index = findProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+ if (proc_index == -1) {//Could not find, lets set the processor to an index.
+ proc_index = setProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+ }
+ if (proc_index != -1 ) {
+ ckptCheckpoint->ckpt_refcount[proc_index].addr = source_addr;
+ ckptCheckpoint->ckpt_refcount[proc_index].count++;
+ }
+ else {
+ log_printf (LOG_LEVEL_ERROR,
+ "CKPT: MAX LIMIT OF PROCESSORS reached. Cannot store new proc %p info.\n",
+ ckptCheckpoint);
+ }
/*
* Reset retention duration since this checkpoint was just opened
*/
@@ -585,6 +865,66 @@
return (0);
}
+//RECOVERY_MUNI
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required) {
+ struct req_exec_ckpt_synchronize_state *req_exec_ckpt_sync_state
+ = (struct req_exec_ckpt_synchronize_state *)message;
+ if ((req_exec_ckpt_sync_state->previous_ring_id.seq != saved_ring_id.seq)
+ || (req_exec_ckpt_sync_state->previous_ring_id.rep.s_addr != saved_ring_id.rep.s_addr)) {
+ return(0);
+ }
+ struct req_exec_ckpt_checkpointopen request_open_exec;
+ struct req_lib_ckpt_checkpointopen request_open_lib;
+
+ struct req_exec_ckpt_sectioncreate request_section_create_exec;
+ struct req_lib_ckpt_sectioncreate request_section_create_lib;
+
+ request_open_lib.checkpointName = req_exec_ckpt_sync_state->checkpointName;
+ request_open_lib.checkpointCreationAttributes = req_exec_ckpt_sync_state->checkpointCreationAttributes;
+ request_open_exec.req_lib_ckpt_checkpointopen = request_open_lib;
+ message_handler_req_exec_ckpt_checkpointopen(&request_open_exec, req_exec_ckpt_sync_state->source_addr, 0);
+
+ request_section_create_lib.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONCREATE;
+ request_section_create_lib.idLen =
+ req_exec_ckpt_sync_state->section.sectionDescriptor.sectionId.idLen;
+ request_section_create_lib.expirationTime =
+ req_exec_ckpt_sync_state->section.sectionDescriptor.expirationTime;
+ request_section_create_lib.initialDataSize =
+ req_exec_ckpt_sync_state->section.sectionDescriptor.sectionSize;
+
+ void *data = malloc(sizeof(struct req_lib_ckpt_sectioncreate)
+ + request_section_create_lib.idLen
+ + request_section_create_lib.initialDataSize);
+ assert(data);
+
+ memcpy(data, &request_section_create_lib, sizeof(struct req_lib_ckpt_sectioncreate));
+ memcpy(data + sizeof(struct req_lib_ckpt_sectioncreate),
+ req_exec_ckpt_sync_state->section.sectionDescriptor.sectionId.id,
+ request_section_create_lib.idLen);
+ memcpy(data + sizeof(struct req_lib_ckpt_sectioncreate) + request_section_create_lib.idLen,
+ req_exec_ckpt_sync_state->section.sectionData,
+ request_section_create_lib.initialDataSize);
+
+ request_section_create_exec.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONCREATE;
+ request_section_create_exec.header.size = sizeof (struct req_exec_ckpt_sectioncreate);
+
+ memcpy (&request_section_create_exec.req_lib_ckpt_sectioncreate,
+ &request_section_create_lib,
+ sizeof (struct req_lib_ckpt_sectioncreate));
+
+ memcpy (&request_section_create_exec.checkpointName,
+ &req_exec_ckpt_sync_state->checkpointName,
+ sizeof (SaNameT));
+
+ request_section_create_exec.source.in_addr.s_addr = req_exec_ckpt_sync_state->source_addr.s_addr;
+
+ message_handler_req_exec_ckpt_sectioncreate(&request_section_create_exec, req_exec_ckpt_sync_state->source_addr, 0);
+
+ free(data);
+
+ return (0);
+}
+
unsigned int abstime_to_msec (SaTimeT time)
{
struct timeval tv;
@@ -644,6 +984,20 @@
}
checkpoint->referenceCount--;
+ /*
+ * Modify the connection reference information to the Checkpoint to be
+ * sent out later as a part of the sync process.
+ * RECOVERY_MUNI
+ */
+ int proc_index = findProcessorIndex(&source_addr, checkpoint->ckpt_refcount);
+ if (proc_index != -1 ) {
+ checkpoint->ckpt_refcount[proc_index].count--;
+ }
+ else {
+ log_printf (LOG_LEVEL_ERROR,
+ "CKPT: Could Not find Processor Info %p info.\n",
+ checkpoint);
+ }
assert (checkpoint->referenceCount >= 0);
log_printf (LOG_LEVEL_DEBUG, "disconnect called, new CKPT ref count is %d\n",
checkpoint->referenceCount);
@@ -652,10 +1006,10 @@
* If checkpoint has been unlinked and this is the last reference, delete it
*/
if (checkpoint->unlinked && checkpoint->referenceCount == 0) {
- log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
+ log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
checkpoint_release (checkpoint);
} else
- if (checkpoint->referenceCount == 0) {
+ if (checkpoint->referenceCount == 0) {
poll_timer_add (aisexec_poll_handle,
checkpoint->checkpointCreationAttributes.retentionDuration / 1000000,
checkpoint,
--- ../latest/include/ipc_ckpt.h 2005-02-20 18:08:13.000000000 -0600
+++ ../bk_openais/include/ipc_ckpt.h 2005-02-21 10:34:49.000000000 -0600
@@ -37,6 +37,8 @@
#include "../include/ipc_gen.h"
#include "../include/ais_types.h"
#include "../include/saCkpt.h"
+#include "../exec/totemsrp.h"
+#include "../exec/ckpt.h"
enum req_lib_ckpt_checkpoint_types {
MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN = 1,
@@ -326,4 +328,14 @@
struct res_header header;
};
+struct req_exec_ckpt_synchronize_state {
+ struct req_header header;
+ struct memb_ring_id previous_ring_id;
+ SaNameT checkpointName;
+ SaCkptCheckpointCreationAttributesT checkpointCreationAttributes;
+ struct saCkptCheckpointSection section;
+ struct in_addr source_addr;
+ SaUint32T ref_count;
+};
+
#endif /* IPC_CKPT_H_DEFINED */
--- ../latest/include/ipc_gen.h 2005-02-20 18:08:13.000000000 -0600
+++ ../bk_openais/include/ipc_gen.h 2005-02-17 13:59:10.000000000 -0600
@@ -69,6 +69,7 @@
MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE,
MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE,
MESSAGE_REQ_EXEC_CKPT_SECTIONREAD,
+ MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE,
MESSAGE_REQ_EXEC_EVT_EVENTDATA,
MESSAGE_REQ_EXEC_EVT_CHANCMD,
MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA
More information about the Openais
mailing list