[Openais] Patch II design review !!

Muni Bajpai muniba at nortel.com
Mon Feb 21 11:10:31 PST 2005


Skipped content of type multipart/alternative-------------- next part --------------
--- ../latest/exec/ckpt.c	2005-02-20 18:08:13.000000000 -0600
+++ ../bk_openais/exec/ckpt.c	2005-02-21 12:14:02.000000000 -0600
@@ -63,11 +63,22 @@
 
 DECLARE_LIST_INIT(checkpointIteratorListHead);
 
+DECLARE_LIST_INIT(recoverySyncListHead);
+
+DECLARE_LIST_INIT(checkpointRecoveryListHead);
+
 struct checkpoint_cleanup {
     struct list_head list;
     struct saCkptCheckpoint *checkpoint;
 };
 
+struct checkpoint_sync {
+	struct list_head list;
+	struct req_exec_ckpt_synchronize_state request;
+};
+
+static void *tok_call_handle = 0;
+
 //TODO static totempg_recovery_plug_handle ckpt_checkpoint_recovery_plug_handle;
 
 static int ckpt_exec_init_fn (void);
@@ -78,6 +89,8 @@
 
 static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct in_addr source_addr, int endian_conversion_required);
 
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required);
+
 static int message_handler_req_exec_ckpt_checkpointclose (void *message, struct in_addr source_addr, int endian_conversion_required);
 
 static int message_handler_req_exec_ckpt_checkpointunlink (void *message, struct in_addr source_addr, int endian_conversion_required);
@@ -133,24 +146,24 @@
 static int message_handler_req_lib_ckpt_sectioniteratorinitialize (struct conn_info *conn_info, void *message);
 static int message_handler_req_lib_ckpt_sectioniteratornext (struct conn_info *conn_info, void *message);
 
-static int ckpt_confchg_fn (
-	enum totempg_configuration_type configuration_type,
-	struct in_addr *member_list, void *member_list_private,
-		int member_list_entries,
-	struct in_addr *left_list, void *left_list_private,
-		int left_list_entries,
-	struct in_addr *joined_list, void *joined_list_private,
-		int joined_list_entries,
-	struct memb_ring_id *ring_id) {
+/*RECOVERY_MUNI*/
+static void ckpt_recovery_inititialize ();
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *);
+static int ckpt_recovery_finalize();
+static int ckpt_recovery_abort();
+static int ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries);
 
-#ifdef TODO
-	if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
-		totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);
-	}
-#endif
+static struct memb_ring_id saved_ring_id;
 
-	return (0);
-}
+static int ckpt_confchg_fn(
+		enum totempg_configuration_type configuration_type,
+		struct in_addr *member_list, void *member_list_private,
+			int member_list_entries,
+		struct in_addr *left_list, void *left_list_private,
+			int left_list_entries,
+		struct in_addr *joined_list, void *joined_list_private,
+			int joined_list_entries,
+		struct memb_ring_id *ring_id);
 
 struct libais_handler ckpt_libais_handlers[] =
 {
@@ -258,7 +271,8 @@
 	message_handler_req_exec_ckpt_sectionexpirationtimeset,
 	message_handler_req_exec_ckpt_sectionwrite,
 	message_handler_req_exec_ckpt_sectionoverwrite,
-	message_handler_req_exec_ckpt_sectionread
+	message_handler_req_exec_ckpt_sectionread,
+	message_handler_req_exec_ckpt_synchronize_state
 };
 
 struct service_handler ckpt_service_handler = {
@@ -273,7 +287,253 @@
 	.exec_dump_fn				= 0
 };
 
-static struct memb_ring_id saved_ring_id;
+static int setProcessorIndex(struct in_addr *proc_addr, 
+								struct ckpt_refcnt ckpt_refcount[]) { //RECOVERY_MUNI
+	int i;
+	for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+		/*
+		 * If the source addresses match then this processor index 
+		 * has already been set
+		 */
+		if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+			return -1;
+		}		
+		/*
+		 * If the source addresses do not match and this element
+		 * has no stored value then store the new value and 
+		 * return the Index.
+		 */
+		else if (ckpt_refcount[i].addr.s_addr == 0) {
+			memcpy(&ckpt_refcount[i].addr, proc_addr, sizeof(struct in_addr));			
+			return i;
+		}
+	}
+	/*
+	 * Could not Find an empty slot 
+	 * to store the new Processor.
+	 */
+	return -1;
+}
+
+static int findProcessorIndex(struct in_addr *proc_addr,
+								struct ckpt_refcnt ckpt_refcount[]) { //RECOVERY_MUNI
+	int i;
+	for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+		/*
+		 * If the source addresses match then return the index
+		 */
+		if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+			return i;
+		}				
+	}
+	/*
+	 * Could not Find the Processor 
+	 */
+	return -1;
+}
+
+static void initialize_ckpt_refcount_array (struct ckpt_refcnt ckpt_refcount[]) {
+	memset((char*)&ckpt_refcount[0], 0, PROCESSOR_COUNT_MAX * sizeof(struct ckpt_refcnt));
+}
+
+static void ckpt_recovery_inititialize () {
+	struct list_head *checkpointList;
+	struct saCkptCheckpoint *checkpoint;
+	struct checkpoint_sync *sync_msg;
+	struct saCkptCheckpoint *savedCheckpoint;
+	struct list_head *checkpointSectionList;
+    struct saCkptCheckpointSection *ckptCheckpointSection;
+	
+
+   	for (checkpointList = checkpointListHead.next;
+        checkpointList != &checkpointListHead;
+        checkpointList = checkpointList->next) {
+
+        checkpoint = list_entry (checkpointList,
+            struct saCkptCheckpoint, list);
+
+		/*Save off the elements in the new list*/
+		savedCheckpoint = 
+			(struct saCkptCheckpoint *) malloc (sizeof(struct saCkptCheckpoint));
+		assert(savedCheckpoint);
+		memcpy(savedCheckpoint, checkpoint, sizeof(struct saCkptCheckpoint));
+		list_init(&savedCheckpoint->list);		
+		list_add(&savedCheckpoint->list,&checkpointRecoveryListHead);		
+		
+		for (checkpointSectionList = checkpoint->checkpointSectionsListHead.next;
+        	checkpointSectionList != &checkpoint->checkpointSectionsListHead;
+            checkpointSectionList = checkpointSectionList->next) {
+
+            ckptCheckpointSection = list_entry (checkpointSectionList,
+            	struct saCkptCheckpointSection, list);                
+            /*Create and save a new Sync message.*/
+			sync_msg = 
+				(struct checkpoint_sync *)malloc(sizeof(struct checkpoint_sync));
+			assert(sync_msg);
+			memcpy(&sync_msg->request.checkpointName, &savedCheckpoint->name, sizeof(SaNameT));
+			memcpy(&sync_msg->request.checkpointCreationAttributes, 
+					&savedCheckpoint->checkpointCreationAttributes, 
+					sizeof(SaCkptCheckpointCreationAttributesT));
+			memcpy(&sync_msg->request.section, ckptCheckpointSection, sizeof(struct saCkptCheckpointSection));
+			sync_msg->request.previous_ring_id = saved_ring_id;			
+			sync_msg->request.source_addr = this_ip.sin_addr;
+			
+			int proc_index = findProcessorIndex(&this_ip.sin_addr,savedCheckpoint->ckpt_refcount);
+		 
+		 	if (proc_index != -1 ) {	 
+		 		memcpy(&sync_msg->request.ref_count,
+		 				&savedCheckpoint->ckpt_refcount[proc_index].count,
+		 				sizeof(SaUint32T));
+		 	}
+		 	else {
+		 		log_printf (LOG_LEVEL_ERROR, 
+		 			"CKPT: Could not find the checkpoint entry for this processor %p \n", 
+		 			checkpoint);
+		 		continue;
+		 	}
+		 	
+		 	list_init(&sync_msg->list);
+			list_add(&sync_msg->list,&recoverySyncListHead);	
+        }
+	}	
+#if 0
+	int res = totempg_token_callback_create(&tok_call_handle, 
+										TOTEMPG_CALLBACK_TOKEN_RECEIVED,
+										0,
+										ckpt_recovery_process,
+										NULL);
+#endif
+}
+
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *data ) {
+
+	struct req_exec_ckpt_synchronize_state *request_exec_sync_state;
+	struct iovec iovecs[2];
+	struct list_head *syncList;
+	struct checkpoint_sync *sync_element;
+	
+	//Check for empty list here
+	if (list_empty(&recoverySyncListHead)) {
+		return (0);
+	}	
+	//Extract the element
+	syncList = recoverySyncListHead.next;
+	sync_element = list_entry (syncList,
+            struct checkpoint_sync, list);
+            
+    	request_exec_sync_state = &sync_element->request;
+
+	log_printf (LOG_LEVEL_DEBUG, "callback recover_process.\n");
+	request_exec_sync_state->header.size =
+		sizeof (struct req_exec_ckpt_synchronize_state);
+	request_exec_sync_state->header.id = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE;
+
+	iovecs[0].iov_base = (char *)&request_exec_sync_state;
+	iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_state);
+
+	assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);	
+	list_del(&sync_element->list);
+	free(sync_element);
+	
+	//Check for empty list here
+	if (list_empty(&recoverySyncListHead)) {
+		//todo Call finalize here.
+		return (0);
+	}
+#if 0
+	//We have more to send ...
+	int res = totempg_token_callback_create(&tok_call_handle, 
+										TOTEMPG_CALLBACK_TOKEN_RECEIVED,
+										0,
+										ckpt_recovery_process,
+										NULL);
+#endif
+}
+
+static int ckpt_recovery_finalize () {
+	struct list_head *checkpointList;
+        struct saCkptCheckpoint *checkpoint;
+
+	checkpointList = checkpointListHead.next;	
+	while (checkpointList != &checkpointListHead) {
+		checkpoint = list_entry (checkpointList,
+            			struct saCkptCheckpoint, list);
+		list_del(&checkpoint->list);
+		free(checkpoint);
+		checkpointList = checkpointListHead.next;
+	}
+	
+	list_init(&checkpointListHead);
+	
+	memcpy(&checkpointListHead, &checkpointRecoveryListHead, sizeof(struct list_head));
+
+	list_init(&checkpointRecoveryListHead);
+	
+	return (0);
+}
+
+static int ckpt_recovery_abort () {
+//@TODO add some code here to abort the recovery process
+	return (0);
+}
+
+static int ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries) {
+	struct list_head *checkpointList;
+	struct saCkptCheckpoint *checkpoint;
+	struct in_addr *member;
+	
+	if (left_list_entries == 0) {
+		return (0);
+	}
+	// Iterate left_list_entries - 1 times and at least once.
+	int i = 0;
+	do {
+		member = left_list;		
+		for (checkpointList = checkpointListHead.next;
+			checkpointList != &checkpointListHead;
+			checkpointList = checkpointList->next) {
+
+			checkpoint = list_entry (checkpointList,
+				struct saCkptCheckpoint, list);
+			
+			int index = findProcessorIndex(member, checkpoint->ckpt_refcount);
+			
+			if (index == -1) {
+				continue;
+			}
+			
+			checkpoint->ckpt_refcount[index].count = 0;
+			//TODO should we decrement the referenceCount also ? or is that done in close ?			
+		}
+		i++;
+		left_list++;
+	} while (i < (left_list_entries - 1));
+	return(0);
+}
+
+static int ckpt_confchg_fn (
+	enum totempg_configuration_type configuration_type,
+	struct in_addr *member_list, void *member_list_private,
+		int member_list_entries,
+	struct in_addr *left_list, void *left_list_private,
+		int left_list_entries,
+	struct in_addr *joined_list, void *joined_list_private,
+		int joined_list_entries,
+	struct memb_ring_id *ring_id) {
+
+	if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
+#ifdef TODO	
+		totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);	
+#endif
+		memcpy (&saved_ring_id, ring_id, sizeof(struct memb_ring_id));//RECOVERY_MUNI	
+	}	
+
+	else if (configuration_type == TOTEMPG_CONFIGURATION_TRANSITIONAL) {
+		ckpt_recovery_process_members_exit(left_list, left_list_entries);
+	}
+	
+	return (0);
+}
 
 static struct saCkptCheckpoint *ckpt_checkpoint_find_global (SaNameT *name)
 {
@@ -406,7 +666,8 @@
 {
 	// Initialize the saved ring ID.
 	saved_ring_id.seq = 0;
-	saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;	
+	saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;		
+	
 #ifdef TODO
 	int res;
 	res = totempg_recovery_plug_create (&ckpt_checkpoint_recovery_plug_handle);
@@ -515,6 +776,7 @@
 		ckptCheckpoint->referenceCount = 0;
 		ckptCheckpoint->retention_timer = 0;
 		ckptCheckpoint->expired = 0;
+		initialize_ckpt_refcount_array(ckptCheckpoint->ckpt_refcount);
 
 		/*
 		 * Add in default checkpoint section
@@ -548,7 +810,25 @@
 	 */
 	log_printf (LOG_LEVEL_DEBUG, "CHECKPOINT opened is %p\n", ckptCheckpoint);
 	ckptCheckpoint->referenceCount += 1;
-
+	
+	/*
+	 * Add the connection reference information to the Checkpoint to be
+	 * sent out later as a part of the sync process.
+	 * RECOVERY_MUNI
+	 */
+	 int proc_index = findProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+	 if (proc_index == -1) {//Could not find, lets set the processor to an index.
+	 	proc_index = setProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+	 }
+	 if (proc_index != -1 ) {	 
+	 	ckptCheckpoint->ckpt_refcount[proc_index].addr = source_addr;
+	 	ckptCheckpoint->ckpt_refcount[proc_index].count++;
+	 }
+	 else {
+	 	log_printf (LOG_LEVEL_ERROR, 
+	 				"CKPT: MAX LIMIT OF PROCESSORS reached. Cannot store new proc %p info.\n", 
+	 				ckptCheckpoint);
+	 }
 	/*
 	 * Reset retention duration since this checkpoint was just opened
 	 */
@@ -585,6 +865,66 @@
 	return (0);
 }
 
+//RECOVERY_MUNI
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required) {
+	struct req_exec_ckpt_synchronize_state *req_exec_ckpt_sync_state 
+					= (struct req_exec_ckpt_synchronize_state *)message;
+	if ((req_exec_ckpt_sync_state->previous_ring_id.seq != saved_ring_id.seq)
+		|| (req_exec_ckpt_sync_state->previous_ring_id.rep.s_addr != saved_ring_id.rep.s_addr)) {
+		return(0);
+	}
+	struct req_exec_ckpt_checkpointopen request_open_exec;	
+	struct req_lib_ckpt_checkpointopen request_open_lib;
+	
+	struct req_exec_ckpt_sectioncreate request_section_create_exec;
+	struct req_lib_ckpt_sectioncreate request_section_create_lib;
+	
+	request_open_lib.checkpointName = req_exec_ckpt_sync_state->checkpointName;
+	request_open_lib.checkpointCreationAttributes = req_exec_ckpt_sync_state->checkpointCreationAttributes;	
+	request_open_exec.req_lib_ckpt_checkpointopen = request_open_lib;	
+	message_handler_req_exec_ckpt_checkpointopen(&request_open_exec, req_exec_ckpt_sync_state->source_addr, 0);
+	
+	request_section_create_lib.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONCREATE;
+	request_section_create_lib.idLen = 
+		req_exec_ckpt_sync_state->section.sectionDescriptor.sectionId.idLen;
+	request_section_create_lib.expirationTime = 
+		req_exec_ckpt_sync_state->section.sectionDescriptor.expirationTime;
+	request_section_create_lib.initialDataSize =
+		req_exec_ckpt_sync_state->section.sectionDescriptor.sectionSize;
+		
+	void *data = malloc(sizeof(struct req_lib_ckpt_sectioncreate) 
+						+ request_section_create_lib.idLen
+						+ request_section_create_lib.initialDataSize);
+	assert(data);
+	
+	memcpy(data, &request_section_create_lib, sizeof(struct req_lib_ckpt_sectioncreate));
+	memcpy(data + sizeof(struct req_lib_ckpt_sectioncreate), 
+			req_exec_ckpt_sync_state->section.sectionDescriptor.sectionId.id,
+			request_section_create_lib.idLen);
+	memcpy(data + sizeof(struct req_lib_ckpt_sectioncreate) + request_section_create_lib.idLen,
+			req_exec_ckpt_sync_state->section.sectionData,
+			request_section_create_lib.initialDataSize);
+	
+	request_section_create_exec.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONCREATE;
+	request_section_create_exec.header.size = sizeof (struct req_exec_ckpt_sectioncreate);
+
+	memcpy (&request_section_create_exec.req_lib_ckpt_sectioncreate,
+		&request_section_create_lib,
+		sizeof (struct req_lib_ckpt_sectioncreate));
+
+	memcpy (&request_section_create_exec.checkpointName,
+		&req_exec_ckpt_sync_state->checkpointName,
+		sizeof (SaNameT));
+
+	request_section_create_exec.source.in_addr.s_addr = req_exec_ckpt_sync_state->source_addr.s_addr;
+	
+	message_handler_req_exec_ckpt_sectioncreate(&request_section_create_exec, req_exec_ckpt_sync_state->source_addr, 0);	
+	
+	free(data);
+	
+	return (0);
+}
+
 unsigned int abstime_to_msec (SaTimeT time)
 {
 	struct timeval tv;
@@ -644,6 +984,20 @@
 	}
 
 	checkpoint->referenceCount--;
+	/*
+	 * Modify the connection reference information to the Checkpoint to be
+	 * sent out later as a part of the sync process.
+	 * RECOVERY_MUNI
+	 */
+	int proc_index = findProcessorIndex(&source_addr, checkpoint->ckpt_refcount);
+	if (proc_index != -1 ) {	 		
+	 	checkpoint->ckpt_refcount[proc_index].count--;
+	}
+	else {
+		log_printf (LOG_LEVEL_ERROR, 
+	 				"CKPT: Could Not find Processor Info %p info.\n", 
+	 				checkpoint);
+	}
 	assert (checkpoint->referenceCount >= 0);
 	log_printf (LOG_LEVEL_DEBUG, "disconnect called, new CKPT ref count is %d\n", 
 		checkpoint->referenceCount);
@@ -652,10 +1006,10 @@
 	 * If checkpoint has been unlinked and this is the last reference, delete it
 	 */
 	if (checkpoint->unlinked && checkpoint->referenceCount == 0) {
-		log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
+		log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");		
 		checkpoint_release (checkpoint);
 	} else
-	if (checkpoint->referenceCount == 0) {
+	if (checkpoint->referenceCount == 0) {		
 		poll_timer_add (aisexec_poll_handle,
 			checkpoint->checkpointCreationAttributes.retentionDuration / 1000000,
 			checkpoint,
--- ../latest/include/ipc_ckpt.h	2005-02-20 18:08:13.000000000 -0600
+++ ../bk_openais/include/ipc_ckpt.h	2005-02-21 10:34:49.000000000 -0600
@@ -37,6 +37,8 @@
 #include "../include/ipc_gen.h"
 #include "../include/ais_types.h"
 #include "../include/saCkpt.h"
+#include "../exec/totemsrp.h"
+#include "../exec/ckpt.h"
 
 enum req_lib_ckpt_checkpoint_types {
 	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN = 1,
@@ -326,4 +328,14 @@
 	struct res_header header;
 };
 
+struct req_exec_ckpt_synchronize_state {
+	struct req_header header;
+	struct memb_ring_id previous_ring_id;
+	SaNameT checkpointName;
+	SaCkptCheckpointCreationAttributesT checkpointCreationAttributes;
+	struct saCkptCheckpointSection section;
+	struct in_addr source_addr;
+	SaUint32T ref_count;
+};
+
 #endif /* IPC_CKPT_H_DEFINED */
--- ../latest/include/ipc_gen.h	2005-02-20 18:08:13.000000000 -0600
+++ ../bk_openais/include/ipc_gen.h	2005-02-17 13:59:10.000000000 -0600
@@ -69,6 +69,7 @@
 	MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE,
 	MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE,
 	MESSAGE_REQ_EXEC_CKPT_SECTIONREAD,
+	MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE,
 	MESSAGE_REQ_EXEC_EVT_EVENTDATA,
 	MESSAGE_REQ_EXEC_EVT_CHANCMD,
 	MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA


More information about the Openais mailing list