[Openais] [RFC-COROSYNC] Add support for AMF
Steven Dake
sdake at redhat.com
Wed Sep 17 10:27:14 PDT 2008
Looks good for commit.
We will address compatibility issues at a later time.
Regards
-steve
On Wed, 2008-09-17 at 21:55 +1200, Angus & Anna Salkeld wrote:
> Hi
>
> This patch adds the functionality I need for AMF.
>
> Steven: you said at some point that I might need to do
> something a bit more elaborate than this, not to break
> existing installations. Can you tell me what you think?
> Suggest a better way of doing this?
>
> Regards
> Angus
>
> ---
> exec/apidef.c | 5 +-
> exec/sync.c | 136 +++++++++++++++++++++++++++++++++++++
> exec/util.c | 5 ++
> exec/util.h | 1 +
> include/corosync/engine/coroapi.h | 17 +++++
> 5 files changed, 163 insertions(+), 1 deletions(-)
>
> diff --git a/exec/apidef.c b/exec/apidef.c
> index 514f162..a3b3520 100644
> --- a/exec/apidef.c
> +++ b/exec/apidef.c
> @@ -45,6 +45,7 @@
> #include <corosync/totem/totemip.h>
> #include "main.h"
> #include "ipc.h"
> +#include "sync.h"
> #include <corosync/engine/coroapi.h>
> #include "service.h"
> #include <corosync/lcr/lcr_ifact.h>
> @@ -93,11 +94,13 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = {
> .tpg_joined_send_ok = totempg_groups_send_ok_joined,
> .tpg_groups_mcast = (typedef_tpg_groups_mcast)totempg_groups_mcast_groups,
> .tpg_groups_send_ok =
> (typedef_tpg_groups_send_ok)totempg_groups_send_ok_groups,
> + .sync_request = sync_request,
> .service_link_and_init = corosync_service_link_and_init,
> .service_unlink_and_exit = corosync_service_unlink_and_exit,
> .plugin_interface_reference = lcr_ifact_reference,
> .plugin_interface_release = lcr_ifact_release,
> - .error_memory_failure = NULL
> + .error_memory_failure = _corosync_out_of_memory_error,
> + .fatal_error = _corosync_exit_error
> };
>
> void apidef_init (struct objdb_iface_ver0 *objdb) {
> diff --git a/exec/sync.c b/exec/sync.c
> index 01cf168..3a0b628 100644
> --- a/exec/sync.c
> +++ b/exec/sync.c
> @@ -63,6 +63,7 @@
> LOGSYS_DECLARE_SUBSYS ("SYNC", LOG_INFO);
>
> #define MESSAGE_REQ_SYNC_BARRIER 0
> +#define MESSAGE_REQ_SYNC_REQUEST 1
>
> struct barrier_data {
> unsigned int nodeid;
> @@ -84,6 +85,7 @@ static void (*sync_synchronization_completed) (void);
> static int sync_recovery_index = 0;
>
> static void *sync_callback_token_handle = 0;
> +static void *sync_request_token_handle;
>
> static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX];
>
> @@ -126,12 +128,27 @@ static struct totempg_group sync_group = {
> };
>
> static totempg_groups_handle sync_group_handle;
> +static char *service_name;
> +static unsigned int current_members[PROCESSOR_COUNT_MAX];
> +static unsigned int current_members_cnt;
>
> struct req_exec_sync_barrier_start {
> mar_req_header_t header;
> struct memb_ring_id ring_id;
> };
>
> +struct sync_request {
> + uint32_t name_len;
> + char name[0] __attribute__((aligned(8)));
> +};
> +
> +typedef struct sync_msg {
> + mar_req_header_t header;
> + struct memb_ring_id ring_id;
> + struct sync_request sync_request;
> +} sync_msg_t;
> +
> +
> /*
> * Send a barrier data structure
> */
> @@ -362,6 +379,7 @@ static void sync_deliver_fn (
> {
> struct req_exec_sync_barrier_start *req_exec_sync_barrier_start =
> (struct req_exec_sync_barrier_start *)iovec[0].iov_base;
> + sync_msg_t *msg = (sync_msg_t *)iovec[0].iov_base;
>
> int i;
>
> @@ -382,6 +400,36 @@ static void sync_deliver_fn (
> return;
> }
>
> + if (msg->header.id == MESSAGE_REQ_SYNC_REQUEST) {
> + if (endian_conversion_required) {
> + swab_mar_uint32_t (&msg->sync_request.name_len);
> + }
> + /*
> + * If there is an ongoing sync, abort it. A requested sync is
> + * only allowed to abort other requested synchronizations,
> + * not full synchronizations.
> + */
> + if (sync_processing && sync_callbacks.sync_abort) {
> + sync_callbacks.sync_abort();
> + sync_callbacks.sync_activate = NULL;
> + sync_processing = 0;
> + assert (service_name != NULL);
> + free (service_name);
> + service_name = NULL;
> + }
> +
> + service_name = malloc (msg->sync_request.name_len);
> + strcpy (service_name, msg->sync_request.name);
> +
> + /*
> + * Start requested synchronization
> + */
> + sync_primary_callback_fn (current_members, current_members_cnt, 1,
> + sync_ring_id);
> +
> + return;
> + }
> +
> /*
> * Set completion for source_addr's address
> */
> @@ -451,6 +499,7 @@ static void sync_confchg_fn (
> unsigned int *joined_list, int joined_list_entries,
> struct memb_ring_id *ring_id)
> {
> + int i;
> sync_ring_id = ring_id;
>
> if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
> @@ -461,6 +510,14 @@ static void sync_confchg_fn (
> sync_callbacks.sync_activate = NULL;
> }
> /*
> + * Save current members and ring ID for later use
> + */
> + for (i = 0; i < member_list_entries; i++) {
> + current_members[i] = member_list[i];
> + }
> + current_members_cnt = member_list_entries;
> +
> + /*
> * If no virtual synchrony filter configured, then start
> * synchronization process
> */
> @@ -472,6 +529,60 @@ static void sync_confchg_fn (
> ring_id);
> }
> }
> +/**
> + * TOTEM callback function used to multicast a sync_request
> + * message
> + * @param type
> + * @param _name
> + *
> + * @return int
> + */
> +static int sync_request_send (
> + enum totem_callback_token_type type, void *_name)
> +{
> + int res;
> + char *name = _name;
> + sync_msg_t msg;
> + struct iovec iovec[2];
> + int name_len;
> +
> + ENTER("'%s'", name);
> +
> + name_len = strlen (name) + 1;
> + msg.header.size = sizeof (msg) + name_len;
> + msg.header.id = MESSAGE_REQ_SYNC_REQUEST;
> +
> + if (sync_ring_id == NULL) {
> + log_printf (LOG_LEVEL_ERROR,
> + "%s sync_ring_id is NULL.\n", __func__);
> + return 1;
> + }
> + memcpy (&msg.ring_id, sync_ring_id, sizeof (struct memb_ring_id));
> + msg.sync_request.name_len = name_len;
> +
> + iovec[0].iov_base = (char *)&msg;
> + iovec[0].iov_len = sizeof (msg);
> + iovec[1].iov_base = _name;
> + iovec[1].iov_len = name_len;
> +
> + res = totempg_groups_mcast_joined (
> + sync_group_handle, iovec, 2, TOTEMPG_AGREED);
> +
> + if (res == 0) {
> + /*
> + * We managed to multicast the message so delete the token callback
> + * for the sync request.
> + */
> + totempg_callback_token_destroy (&sync_request_token_handle);
> + }
> +
> + /*
> + * if we failed to multicast the message, this function will be called
> + * again.
> + */
> +
> + return (0);
> +}
>
> int sync_in_process (void)
> {
> @@ -486,3 +597,28 @@ int sync_primary_designated (void)
> return (vsf_iface->primary());
> }
> }
> +
> +/**
> + * Execute synchronization upon request for the named service
> + * @param name
> + *
> + * @return int
> + */
> +int sync_request (char *name)
> +{
> + assert (name != NULL);
> +
> + ENTER("'%s'", name);
> +
> + if (sync_processing) {
> + return -1;
> + }
> +
> + totempg_callback_token_create (&sync_request_token_handle,
> + TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */
> + sync_request_send, name);
> +
> + LEAVE("");
> +
> + return 0;
> +}
> diff --git a/exec/util.c b/exec/util.c
> index 7939a4a..a96c440 100644
> --- a/exec/util.c
> +++ b/exec/util.c
> @@ -76,6 +76,11 @@ SaTimeT clust_time_now(void)
> return time_now;
> }
>
> +void _corosync_out_of_memory_error (void)
> +{
> + assert (0==1);
> + exit (EXIT_FAILURE);
> +}
>
> void _corosync_exit_error (
> enum e_ais_done err, const char *file, unsigned int line)
> diff --git a/exec/util.h b/exec/util.h
> index 7446f27..1d999d6 100644
> --- a/exec/util.h
> +++ b/exec/util.h
> @@ -71,6 +71,7 @@ extern int mar_name_match(mar_name_t *name1,
> mar_name_t *name2);
> #define corosync_exit_error(err) _corosync_exit_error ((err),
> __FILE__, __LINE__)
> extern void _corosync_exit_error (
> enum e_ais_done err, const char *file, unsigned int line);
> +void _corosync_out_of_memory_error (void);
> extern char *getSaNameT (SaNameT *name);
> extern char *strstr_rs (const char *haystack, const char *needle);
> extern void setSaNameT (SaNameT *name, char *str);
> diff --git a/include/corosync/engine/coroapi.h
> b/include/corosync/engine/coroapi.h
> index 1b9dae2..d2abd12 100644
> --- a/include/corosync/engine/coroapi.h
> +++ b/include/corosync/engine/coroapi.h
> @@ -64,6 +64,8 @@ struct corosync_tpg_group {
> #define TOTEM_AGREED 0
> #define TOTEM_SAFE 1
>
> +#define MILLI_2_NANO_SECONDS 1000000ULL
> +
> #if !defined(TOTEM_IP_ADDRESS)
> struct totem_ip_address {
> unsigned int nodeid;
> @@ -98,6 +100,16 @@ enum corosync_flow_control_state {
> };
> #endif
>
> +typedef enum {
> + COROSYNC_FATAL_ERROR_EXIT = -1,
> + COROSYNC_LIBAIS_SOCKET = -6,
> + COROSYNC_LIBAIS_BIND = -7,
> + COROSYNC_READKEY = -8,
> + COROSYNC_INVALID_CONFIG = -9,
> + COROSYNC_DYNAMICLOAD = -12,
> + COROSYNC_OUT_OF_MEMORY = -15,
> + COROSYNC_FATAL_ERR = -16
> +} corosync_fatal_error_t;
>
> #ifndef OBJECT_PARENT_HANDLE
>
> @@ -440,6 +452,9 @@ struct corosync_api_v1 {
> struct iovec *iovec,
> int iov_len);
>
> + int (*sync_request) (
> + char *service_name);
> +
> /*
> * Plugin loading and unloading
> */
> @@ -469,6 +484,8 @@ struct corosync_api_v1 {
> * Error handling APIs
> */
> void (*error_memory_failure) (void);
> +#define corosync_fatal_error(err) api->fatal_error ((err), __FILE__, __LINE__)
> + void (*fatal_error) (corosync_fatal_error_t err, const char *file,
> unsigned int line);
> };
>
> #define SERVICE_ID_MAKE(a,b) ( ((a)<<16) | (b) )
> --
> _______________________________________________
> Openais mailing list
> Openais at lists.linux-foundation.org
> https://lists.linux-foundation.org/mailman/listinfo/openais
More information about the Openais
mailing list