[Openais] [RFC-COROSYNC] Add support for AMF

Steven Dake sdake at redhat.com
Wed Sep 17 10:27:14 PDT 2008


Looks good for commit.

We will address compatibility issues at a later time.

Regards
-steve

On Wed, 2008-09-17 at 21:55 +1200, Angus & Anna Salkeld wrote:
> Hi
> 
> This patch adds the functionality I need for AMF.
> 
> Steven: you said at some point that I might need to do
> something a bit more elaborate than this, not to break
> existing installations.  Can you tell me what you think?
> Suggest a better way of doing this?
> 
> Regards
> Angus
> 
> ---
>  exec/apidef.c                     |    5 +-
>  exec/sync.c                       |  136 +++++++++++++++++++++++++++++++++++++
>  exec/util.c                       |    5 ++
>  exec/util.h                       |    1 +
>  include/corosync/engine/coroapi.h |   17 +++++
>  5 files changed, 163 insertions(+), 1 deletions(-)
> 
> diff --git a/exec/apidef.c b/exec/apidef.c
> index 514f162..a3b3520 100644
> --- a/exec/apidef.c
> +++ b/exec/apidef.c
> @@ -45,6 +45,7 @@
>  #include <corosync/totem/totemip.h>
>  #include "main.h"
>  #include "ipc.h"
> +#include "sync.h"
>  #include <corosync/engine/coroapi.h>
>  #include "service.h"
>  #include <corosync/lcr/lcr_ifact.h>
> @@ -93,11 +94,13 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = {
>  	.tpg_joined_send_ok = totempg_groups_send_ok_joined,
>  	.tpg_groups_mcast = (typedef_tpg_groups_mcast)totempg_groups_mcast_groups,
>  	.tpg_groups_send_ok =
> (typedef_tpg_groups_send_ok)totempg_groups_send_ok_groups,
> +	.sync_request = sync_request,
>  	.service_link_and_init = corosync_service_link_and_init,
>  	.service_unlink_and_exit = corosync_service_unlink_and_exit,
>  	.plugin_interface_reference = lcr_ifact_reference,
>  	.plugin_interface_release = lcr_ifact_release,
> -	.error_memory_failure = NULL
> +	.error_memory_failure = _corosync_out_of_memory_error,
> +	.fatal_error = _corosync_exit_error
>  };
> 
>  void apidef_init (struct objdb_iface_ver0 *objdb) {
> diff --git a/exec/sync.c b/exec/sync.c
> index 01cf168..3a0b628 100644
> --- a/exec/sync.c
> +++ b/exec/sync.c
> @@ -63,6 +63,7 @@
>  LOGSYS_DECLARE_SUBSYS ("SYNC", LOG_INFO);
> 
>  #define MESSAGE_REQ_SYNC_BARRIER 0
> +#define MESSAGE_REQ_SYNC_REQUEST 1
> 
>  struct barrier_data {
>  	unsigned int nodeid;
> @@ -84,6 +85,7 @@ static void (*sync_synchronization_completed) (void);
>  static int sync_recovery_index = 0;
> 
>  static void *sync_callback_token_handle = 0;
> +static void *sync_request_token_handle;
> 
>  static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX];
> 
> @@ -126,12 +128,27 @@ static struct totempg_group sync_group = {
>  };
> 
>  static totempg_groups_handle sync_group_handle;
> +static char *service_name;
> +static unsigned int current_members[PROCESSOR_COUNT_MAX];
> +static unsigned int current_members_cnt;
> 
>  struct req_exec_sync_barrier_start {
>  	mar_req_header_t header;
>  	struct memb_ring_id ring_id;
>  };
> 
> +struct sync_request {
> +	uint32_t name_len;
> +	char name[0] __attribute__((aligned(8)));
> +};
> +
> +typedef struct sync_msg {
> +	mar_req_header_t header;
> +	struct memb_ring_id ring_id;
> +	struct sync_request sync_request;
> +} sync_msg_t;
> +
> +
>  /*
>   * Send a barrier data structure
>   */
> @@ -362,6 +379,7 @@ static void sync_deliver_fn (
>  {
>  	struct req_exec_sync_barrier_start *req_exec_sync_barrier_start =
>  		(struct req_exec_sync_barrier_start *)iovec[0].iov_base;
> +	sync_msg_t *msg = (sync_msg_t *)iovec[0].iov_base;
> 
>  	int i;
> 
> @@ -382,6 +400,36 @@ static void sync_deliver_fn (
>  		return;
>  	}
> 
> +	if (msg->header.id == MESSAGE_REQ_SYNC_REQUEST) {
> +		if (endian_conversion_required) {
> +			swab_mar_uint32_t (&msg->sync_request.name_len);
> +		}	
> +		/*
> +		 * If there is an ongoing sync, abort it. A requested sync is
> +		 * only allowed to abort other requested synchronizations,
> +		 * not full synchronizations.
> +		 */
> +		if (sync_processing && sync_callbacks.sync_abort) {
> +			sync_callbacks.sync_abort();
> +			sync_callbacks.sync_activate = NULL;
> +			sync_processing = 0;
> +			assert (service_name != NULL);
> +			free (service_name);
> +			service_name = NULL;
> +		}
> +
> +		service_name = malloc (msg->sync_request.name_len);
> +		strcpy (service_name, msg->sync_request.name);
> +
> +		/*
> +		 * Start requested synchronization
> +		 */
> +		sync_primary_callback_fn (current_members, current_members_cnt,	1,
> +			sync_ring_id);
> +
> +		return;
> +	}
> +
>  	/*
>  	 * Set completion for source_addr's address
>  	 */
> @@ -451,6 +499,7 @@ static void sync_confchg_fn (
>  	unsigned int *joined_list, int joined_list_entries,
>  	struct memb_ring_id *ring_id)
>  {
> +	int i;
>  	sync_ring_id = ring_id;
> 
>  	if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
> @@ -461,6 +510,14 @@ static void sync_confchg_fn (
>  		sync_callbacks.sync_activate = NULL;
>  	}
>  	/*
> +	 * Save current members and ring ID for later use
> +	 */
> +	for (i = 0; i < member_list_entries; i++) {
> +		current_members[i] = member_list[i];
> +	}
> +	current_members_cnt = member_list_entries;
> +
> +	/*
>  	 * If no virtual synchrony filter configured, then start
>  	 * synchronization process
>  	 */
> @@ -472,6 +529,60 @@ static void sync_confchg_fn (
>  			ring_id);
>  	}
>  }
> +/**
> + * TOTEM callback function used to multicast a sync_request
> + * message
> + * @param type
> + * @param _name
> + *
> + * @return int
> + */
> +static int sync_request_send (
> +	enum totem_callback_token_type type, void *_name)
> +{
> +	int res;
> +	char *name = _name;
> +	sync_msg_t msg;
> +	struct iovec iovec[2];
> +	int name_len;
> +
> +	ENTER("'%s'", name);
> +
> +	name_len = strlen (name) + 1;
> +	msg.header.size = sizeof (msg) + name_len;
> +	msg.header.id = MESSAGE_REQ_SYNC_REQUEST;
> +
> +	if (sync_ring_id == NULL) {
> +		log_printf (LOG_LEVEL_ERROR,
> +			"%s sync_ring_id is NULL.\n", __func__);
> +		return 1;
> +	}
> +	memcpy (&msg.ring_id, sync_ring_id,	sizeof (struct memb_ring_id));
> +	msg.sync_request.name_len = name_len;
> +
> +	iovec[0].iov_base = (char *)&msg;
> +	iovec[0].iov_len = sizeof (msg);
> +	iovec[1].iov_base = _name;
> +	iovec[1].iov_len = name_len;
> +
> +	res = totempg_groups_mcast_joined (
> +		sync_group_handle, iovec, 2, TOTEMPG_AGREED);
> +
> +	if (res == 0) {
> +		/*
> +		 * We managed to multicast the message so delete the token callback
> +		 * for the sync request.
> +		 */
> +		totempg_callback_token_destroy (&sync_request_token_handle);
> +	}
> +
> +	/*
> +	 * if we failed to multicast the message, this function will be called
> +	 * again.
> +	 */
> +
> +	return (0);
> +}
> 
>  int sync_in_process (void)
>  {
> @@ -486,3 +597,28 @@ int sync_primary_designated (void)
>  		return (vsf_iface->primary());
>  	}
>  }
> +
> +/**
> + * Execute synchronization upon request for the named service
> + * @param name
> + *
> + * @return int
> + */
> +int sync_request (char *name)
> +{
> +	assert (name != NULL);
> +
> +	ENTER("'%s'", name);
> +
> +	if (sync_processing) {
> +		return -1;
> +	}
> +
> +	totempg_callback_token_create (&sync_request_token_handle,
> +		TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */
> +		sync_request_send, name);
> +
> +	LEAVE("");
> +
> +	return 0;
> +}
> diff --git a/exec/util.c b/exec/util.c
> index 7939a4a..a96c440 100644
> --- a/exec/util.c
> +++ b/exec/util.c
> @@ -76,6 +76,11 @@ SaTimeT clust_time_now(void)
>  	return time_now;
>  }
> 
> +void _corosync_out_of_memory_error (void)
> +{
> +	assert (0==1);
> +	exit (EXIT_FAILURE);
> +}
> 
>  void _corosync_exit_error (
>  	enum e_ais_done err, const char *file, unsigned int line)
> diff --git a/exec/util.h b/exec/util.h
> index 7446f27..1d999d6 100644
> --- a/exec/util.h
> +++ b/exec/util.h
> @@ -71,6 +71,7 @@ extern int mar_name_match(mar_name_t *name1,
> mar_name_t *name2);
>  #define corosync_exit_error(err) _corosync_exit_error ((err),
> __FILE__, __LINE__)
>  extern void _corosync_exit_error (
>  	enum e_ais_done err, const char *file, unsigned int line);
> +void _corosync_out_of_memory_error (void);
>  extern char *getSaNameT (SaNameT *name);
>  extern char *strstr_rs (const char *haystack, const char *needle);
>  extern void setSaNameT (SaNameT *name, char *str);
> diff --git a/include/corosync/engine/coroapi.h
> b/include/corosync/engine/coroapi.h
> index 1b9dae2..d2abd12 100644
> --- a/include/corosync/engine/coroapi.h
> +++ b/include/corosync/engine/coroapi.h
> @@ -64,6 +64,8 @@ struct corosync_tpg_group {
>  #define TOTEM_AGREED	0
>  #define TOTEM_SAFE	1
> 
> +#define MILLI_2_NANO_SECONDS 1000000ULL
> +
>  #if !defined(TOTEM_IP_ADDRESS)
>  struct totem_ip_address {
>  	unsigned int   nodeid;
> @@ -98,6 +100,16 @@ enum corosync_flow_control_state {
>  };
>  #endif
> 
> +typedef enum {
> +	COROSYNC_FATAL_ERROR_EXIT = -1,
> +	COROSYNC_LIBAIS_SOCKET = -6,
> +	COROSYNC_LIBAIS_BIND = -7,
> +	COROSYNC_READKEY = -8,
> +	COROSYNC_INVALID_CONFIG = -9,
> +	COROSYNC_DYNAMICLOAD = -12,
> +	COROSYNC_OUT_OF_MEMORY = -15,
> +	COROSYNC_FATAL_ERR = -16
> +} corosync_fatal_error_t;
> 
>  #ifndef OBJECT_PARENT_HANDLE
> 
> @@ -440,6 +452,9 @@ struct corosync_api_v1 {
>  		struct iovec *iovec,
>  		int iov_len);
> 
> +	int (*sync_request) (
> +		char *service_name);
> +
>  	/*
>  	 * Plugin loading and unloading
>  	 */
> @@ -469,6 +484,8 @@ struct corosync_api_v1 {
>  	 * Error handling APIs
>  	 */
>  	void (*error_memory_failure) (void);
> +#define corosync_fatal_error(err) api->fatal_error ((err), __FILE__, __LINE__)
> +	void (*fatal_error) (corosync_fatal_error_t err, const char *file,
> unsigned int line);
>  };
> 
>  #define SERVICE_ID_MAKE(a,b) ( ((a)<<16) | (b) )
> --
> _______________________________________________
> Openais mailing list
> Openais at lists.linux-foundation.org
> https://lists.linux-foundation.org/mailman/listinfo/openais



More information about the Openais mailing list