pacemaker 2.1.1-77db578727
Scalable High-Availability cluster resource manager
Loading...
Searching...
No Matches
cpg.c
Go to the documentation of this file.
1/*
2 * Copyright 2004-2020 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
11#include <bzlib.h>
12#include <sys/socket.h>
13#include <netinet/in.h>
14#include <arpa/inet.h>
15#include <netdb.h>
16
17#include <crm/common/ipc.h>
19#include <crm/common/mainloop.h>
20#include <sys/utsname.h>
21
22#include <qb/qbipc_common.h>
23#include <qb/qbipcc.h>
24#include <qb/qbutil.h>
25
26#include <corosync/corodefs.h>
27#include <corosync/corotypes.h>
28#include <corosync/hdb.h>
29#include <corosync/cpg.h>
30
31#include <crm/msg_xml.h>
32
33#include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
34#include "crmcluster_private.h"
35
36/* @TODO Once we can update the public API to require crm_cluster_t* in more
37 * functions, we can ditch this in favor of cluster->cpg_handle.
38 */
39static cpg_handle_t pcmk_cpg_handle = 0;
40
41// @TODO These could be moved to crm_cluster_t* at that time as well
42static bool cpg_evicted = false;
43static GList *cs_message_queue = NULL;
44static int cs_message_timer = 0;
45
46struct pcmk__cpg_host_s {
47 uint32_t id;
48 uint32_t pid;
49 gboolean local;
51 uint32_t size;
52 char uname[MAX_NAME];
53} __attribute__ ((packed));
54
55typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
56
57struct pcmk__cpg_msg_s {
58 struct qb_ipc_response_header header __attribute__ ((aligned(8)));
59 uint32_t id;
60 gboolean is_compressed;
61
64
65 uint32_t size;
66 uint32_t compressed_size;
67 /* 584 bytes */
68 char data[0];
69
70} __attribute__ ((packed));
71
72typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
73
74static void crm_cs_flush(gpointer data);
75
76#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
77
78#define cs_repeat(rc, counter, max, code) do { \
79 rc = code; \
80 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
81 counter++; \
82 crm_debug("Retrying operation after %ds", counter); \
83 sleep(counter); \
84 } else { \
85 break; \
86 } \
87 } while (counter < max)
88
94void
96{
97 pcmk_cpg_handle = 0;
98 if (cluster->cpg_handle) {
99 crm_trace("Disconnecting CPG");
100 cpg_leave(cluster->cpg_handle, &cluster->group);
101 cpg_finalize(cluster->cpg_handle);
102 cluster->cpg_handle = 0;
103
104 } else {
105 crm_info("No CPG connection");
106 }
107}
108
116uint32_t
117get_local_nodeid(cpg_handle_t handle)
118{
119 cs_error_t rc = CS_OK;
120 int retries = 0;
121 static uint32_t local_nodeid = 0;
122 cpg_handle_t local_handle = handle;
123 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
124 int fd = -1;
125 uid_t found_uid = 0;
126 gid_t found_gid = 0;
127 pid_t found_pid = 0;
128 int rv;
129
130 if(local_nodeid != 0) {
131 return local_nodeid;
132 }
133
134 if(handle == 0) {
135 crm_trace("Creating connection");
136 cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
137 if (rc != CS_OK) {
138 crm_err("Could not connect to the CPG API: %s (%d)",
139 cs_strerror(rc), rc);
140 return 0;
141 }
142
143 rc = cpg_fd_get(local_handle, &fd);
144 if (rc != CS_OK) {
145 crm_err("Could not obtain the CPG API connection: %s (%d)",
146 cs_strerror(rc), rc);
147 goto bail;
148 }
149
150 /* CPG provider run as root (in given user namespace, anyway)? */
151 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
152 &found_uid, &found_gid))) {
153 crm_err("CPG provider is not authentic:"
154 " process %lld (uid: %lld, gid: %lld)",
155 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
156 (long long) found_uid, (long long) found_gid);
157 goto bail;
158 } else if (rv < 0) {
159 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
160 strerror(-rv), -rv);
161 goto bail;
162 }
163 }
164
165 if (rc == CS_OK) {
166 retries = 0;
167 crm_trace("Performing lookup");
168 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
169 }
170
171 if (rc != CS_OK) {
172 crm_err("Could not get local node id from the CPG API: %s (%d)",
173 pcmk__cs_err_str(rc), rc);
174 }
175
176bail:
177 if(handle == 0) {
178 crm_trace("Closing connection");
179 cpg_finalize(local_handle);
180 }
181 crm_debug("Local nodeid is %u", local_nodeid);
182 return local_nodeid;
183}
184
193static gboolean
194crm_cs_flush_cb(gpointer data)
195{
196 cs_message_timer = 0;
197 crm_cs_flush(data);
198 return FALSE;
199}
200
201// Send no more than this many CPG messages in one flush
202#define CS_SEND_MAX 200
203
210static void
211crm_cs_flush(gpointer data)
212{
213 unsigned int sent = 0;
214 guint queue_len = 0;
215 cs_error_t rc = 0;
216 cpg_handle_t *handle = (cpg_handle_t *) data;
217
218 if (*handle == 0) {
219 crm_trace("Connection is dead");
220 return;
221 }
222
223 queue_len = g_list_length(cs_message_queue);
224 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
225 crm_err("CPG queue has grown to %d", queue_len);
226
227 } else if (queue_len == CS_SEND_MAX) {
228 crm_warn("CPG queue has grown to %d", queue_len);
229 }
230
231 if (cs_message_timer != 0) {
232 /* There is already a timer, wait until it goes off */
233 crm_trace("Timer active %d", cs_message_timer);
234 return;
235 }
236
237 while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
238 struct iovec *iov = cs_message_queue->data;
239
240 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
241 if (rc != CS_OK) {
242 break;
243 }
244
245 sent++;
246 crm_trace("CPG message sent, size=%llu",
247 (unsigned long long) iov->iov_len);
248
249 cs_message_queue = g_list_remove(cs_message_queue, iov);
250 free(iov->iov_base);
251 free(iov);
252 }
253
254 queue_len -= sent;
255 if ((sent > 1) || (cs_message_queue != NULL)) {
256 crm_info("Sent %u CPG messages (%d remaining): %s (%d)",
257 sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
258 } else {
259 crm_trace("Sent %u CPG messages (%d remaining): %s (%d)",
260 sent, queue_len, pcmk__cs_err_str(rc), (int) rc);
261 }
262
263 if (cs_message_queue) {
264 uint32_t delay_ms = 100;
265 if (rc != CS_OK) {
266 /* Proportionally more if sending failed but cap at 1s */
267 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
268 }
269 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
270 }
271}
272
281static int
282pcmk_cpg_dispatch(gpointer user_data)
283{
284 cs_error_t rc = CS_OK;
285 crm_cluster_t *cluster = (crm_cluster_t *) user_data;
286
287 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
288 if (rc != CS_OK) {
289 crm_err("Connection to the CPG API failed: %s (%d)",
290 pcmk__cs_err_str(rc), rc);
291 cpg_finalize(cluster->cpg_handle);
292 cluster->cpg_handle = 0;
293 return -1;
294
295 } else if (cpg_evicted) {
296 crm_err("Evicted from CPG membership");
297 return -1;
298 }
299 return 0;
300}
301
302static inline const char *
303ais_dest(const pcmk__cpg_host_t *host)
304{
305 if (host->local) {
306 return "local";
307 } else if (host->size > 0) {
308 return host->uname;
309 } else {
310 return "<all>";
311 }
312}
313
314static inline const char *
315msg_type2text(enum crm_ais_msg_types type)
316{
317 const char *text = "unknown";
318
319 switch (type) {
320 case crm_msg_none:
321 text = "unknown";
322 break;
323 case crm_msg_ais:
324 text = "ais";
325 break;
326 case crm_msg_cib:
327 text = "cib";
328 break;
329 case crm_msg_crmd:
330 text = "crmd";
331 break;
332 case crm_msg_pe:
333 text = "pengine";
334 break;
335 case crm_msg_te:
336 text = "tengine";
337 break;
338 case crm_msg_lrmd:
339 text = "lrmd";
340 break;
341 case crm_msg_attrd:
342 text = "attrd";
343 break;
344 case crm_msg_stonithd:
345 text = "stonithd";
346 break;
348 text = "stonith-ng";
349 break;
350 }
351 return text;
352}
353
362static bool
363check_message_sanity(const pcmk__cpg_msg_t *msg)
364{
365 int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
366
367 if (payload_size < 1) {
368 crm_err("%sCPG message %d from %s invalid: "
369 "Claimed size of %d bytes is too small "
370 CRM_XS " from %s[%u] to %s@%s",
371 (msg->is_compressed? "Compressed " : ""),
372 msg->id, ais_dest(&(msg->sender)),
373 (int) msg->header.size,
374 msg_type2text(msg->sender.type), msg->sender.pid,
375 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
376 return false;
377 }
378
379 if (msg->header.error != CS_OK) {
380 crm_err("%sCPG message %d from %s invalid: "
381 "Sender indicated error %d "
382 CRM_XS " from %s[%u] to %s@%s",
383 (msg->is_compressed? "Compressed " : ""),
384 msg->id, ais_dest(&(msg->sender)),
385 msg->header.error,
386 msg_type2text(msg->sender.type), msg->sender.pid,
387 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
388 return false;
389 }
390
391 if (msg_data_len(msg) != payload_size) {
392 crm_err("%sCPG message %d from %s invalid: "
393 "Total size %d inconsistent with payload size %d "
394 CRM_XS " from %s[%u] to %s@%s",
395 (msg->is_compressed? "Compressed " : ""),
396 msg->id, ais_dest(&(msg->sender)),
397 (int) msg->header.size, (int) msg_data_len(msg),
398 msg_type2text(msg->sender.type), msg->sender.pid,
399 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
400 return false;
401 }
402
403 if (!msg->is_compressed &&
404 /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
405 * but checking the last byte or two should be quick
406 */
407 (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
408 || (msg->data[msg->size - 1] != '\0'))) {
409 crm_err("CPG message %d from %s invalid: "
410 "Payload does not end at byte %llu "
411 CRM_XS " from %s[%u] to %s@%s",
412 msg->id, ais_dest(&(msg->sender)),
413 (unsigned long long) msg->size,
414 msg_type2text(msg->sender.type), msg->sender.pid,
415 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
416 return false;
417 }
418
419 crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
420 (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
421 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
422 ais_dest(&(msg->sender)),
423 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
424 return true;
425}
426
443char *
444pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
445 uint32_t *kind, const char **from)
446{
447 char *data = NULL;
448 pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
449
450 if(handle) {
451 // Do filtering and field massaging
452 uint32_t local_nodeid = get_local_nodeid(handle);
453 const char *local_name = get_local_node_name();
454
455 if (msg->sender.id > 0 && msg->sender.id != nodeid) {
456 crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
457 return NULL;
458
459 } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
460 /* Not for us */
461 crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
462 return NULL;
463 } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
464 /* Not for us */
465 crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
466 return NULL;
467 }
468
469 msg->sender.id = nodeid;
470 if (msg->sender.size == 0) {
471 crm_node_t *peer = crm_get_peer(nodeid, NULL);
472
473 if (peer == NULL) {
474 crm_err("Peer with nodeid=%u is unknown", nodeid);
475
476 } else if (peer->uname == NULL) {
477 crm_err("No uname for peer with nodeid=%u", nodeid);
478
479 } else {
480 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
481 msg->sender.size = strlen(peer->uname);
482 memset(msg->sender.uname, 0, MAX_NAME);
483 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
484 }
485 }
486 }
487
488 crm_trace("Got new%s message (size=%d, %d, %d)",
489 msg->is_compressed ? " compressed" : "",
490 msg_data_len(msg), msg->size, msg->compressed_size);
491
492 if (kind != NULL) {
493 *kind = msg->header.id;
494 }
495 if (from != NULL) {
496 *from = msg->sender.uname;
497 }
498
499 if (msg->is_compressed && msg->size > 0) {
500 int rc = BZ_OK;
501 char *uncompressed = NULL;
502 unsigned int new_size = msg->size + 1;
503
504 if (!check_message_sanity(msg)) {
505 goto badmsg;
506 }
507
508 crm_trace("Decompressing message data");
509 uncompressed = calloc(1, new_size);
510 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
511
512 if (rc != BZ_OK) {
513 crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
514 bz2_strerror(rc), rc);
515 free(uncompressed);
516 goto badmsg;
517 }
518
519 CRM_ASSERT(rc == BZ_OK);
520 CRM_ASSERT(new_size == msg->size);
521
522 data = uncompressed;
523
524 } else if (!check_message_sanity(msg)) {
525 goto badmsg;
526
527 } else {
528 data = strdup(msg->data);
529 }
530
531 // Is this necessary?
532 crm_get_peer(msg->sender.id, msg->sender.uname);
533
534 crm_trace("Payload: %.200s", data);
535 return data;
536
537 badmsg:
538 crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
539 " min=%d, total=%d, size=%d, bz2_size=%d",
540 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
541 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
542 msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
543 msg->header.size, msg->size, msg->compressed_size);
544
545 free(data);
546 return NULL;
547}
548
560static int
561cmp_member_list_nodeid(const void *first, const void *second)
562{
563 const struct cpg_address *const a = *((const struct cpg_address **) first),
564 *const b = *((const struct cpg_address **) second);
565 if (a->nodeid < b->nodeid) {
566 return -1;
567 } else if (a->nodeid > b->nodeid) {
568 return 1;
569 }
570 /* don't bother with "reason" nor "pid" */
571 return 0;
572}
573
582static const char *
583cpgreason2str(cpg_reason_t reason)
584{
585 switch (reason) {
586 case CPG_REASON_JOIN: return " via cpg_join";
587 case CPG_REASON_LEAVE: return " via cpg_leave";
588 case CPG_REASON_NODEDOWN: return " via cluster exit";
589 case CPG_REASON_NODEUP: return " via cluster join";
590 case CPG_REASON_PROCDOWN: return " for unknown reason";
591 default: break;
592 }
593 return "";
594}
595
604static inline const char *
605peer_name(crm_node_t *peer)
606{
607 if (peer == NULL) {
608 return "unknown node";
609 } else if (peer->uname == NULL) {
610 return "peer node";
611 } else {
612 return peer->uname;
613 }
614}
615
628void
629pcmk_cpg_membership(cpg_handle_t handle,
630 const struct cpg_name *groupName,
631 const struct cpg_address *member_list, size_t member_list_entries,
632 const struct cpg_address *left_list, size_t left_list_entries,
633 const struct cpg_address *joined_list, size_t joined_list_entries)
634{
635 int i;
636 gboolean found = FALSE;
637 static int counter = 0;
638 uint32_t local_nodeid = get_local_nodeid(handle);
639 const struct cpg_address *key, **sorted;
640
641 sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
642 CRM_ASSERT(sorted != NULL);
643
644 for (size_t iter = 0; iter < member_list_entries; iter++) {
645 sorted[iter] = member_list + iter;
646 }
647 /* so that the cross-matching multiply-subscribed nodes is then cheap */
648 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
649 cmp_member_list_nodeid);
650
651 for (i = 0; i < left_list_entries; i++) {
652 crm_node_t *peer = pcmk__search_cluster_node_cache(left_list[i].nodeid,
653 NULL);
654 const struct cpg_address **rival = NULL;
655
656 /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
657 and not playing by this rule may go wild in case of multiple
658 residual instances of the same pacemaker daemon at the same node
659 -- we must ensure that the possible local rival(s) won't make us
660 cry out and bail (e.g. when they quit themselves), since all the
661 surrounding logic denies this simple fact that the full membership
662 is discriminated also per the PID of the process beside mere node
663 ID (and implicitly, group ID); practically, this will be sound in
664 terms of not preventing progress, since all the CPG joiners are
665 also API end-point carriers, and that's what matters locally
666 (who's the winner);
667 remotely, we will just compare leave_list and member_list and if
668 the left process has its node retained in member_list (under some
669 other PID, anyway) we will just ignore it as well
670 XXX: long-term fix is to establish in-out PID-aware tracking? */
671 if (peer) {
672 key = &left_list[i];
673 rival = bsearch(&key, sorted, member_list_entries,
674 sizeof(const struct cpg_address *),
675 cmp_member_list_nodeid);
676 }
677
678 if (rival == NULL) {
679 crm_info("Group %s event %d: %s (node %u pid %u) left%s",
680 groupName->value, counter, peer_name(peer),
681 left_list[i].nodeid, left_list[i].pid,
682 cpgreason2str(left_list[i].reason));
683 if (peer) {
684 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
686 }
687 } else if (left_list[i].nodeid == local_nodeid) {
688 crm_warn("Group %s event %d: duplicate local pid %u left%s",
689 groupName->value, counter,
690 left_list[i].pid, cpgreason2str(left_list[i].reason));
691 } else {
692 crm_warn("Group %s event %d: "
693 "%s (node %u) duplicate pid %u left%s (%u remains)",
694 groupName->value, counter, peer_name(peer),
695 left_list[i].nodeid, left_list[i].pid,
696 cpgreason2str(left_list[i].reason), (*rival)->pid);
697 }
698 }
699 free(sorted);
700 sorted = NULL;
701
702 for (i = 0; i < joined_list_entries; i++) {
703 crm_info("Group %s event %d: node %u pid %u joined%s",
704 groupName->value, counter, joined_list[i].nodeid,
705 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
706 }
707
708 for (i = 0; i < member_list_entries; i++) {
709 crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
710
711 if (member_list[i].nodeid == local_nodeid
712 && member_list[i].pid != getpid()) {
713 /* see the note above */
714 crm_warn("Group %s event %d: detected duplicate local pid %u",
715 groupName->value, counter, member_list[i].pid);
716 continue;
717 }
718 crm_info("Group %s event %d: %s (node %u pid %u) is member",
719 groupName->value, counter, peer_name(peer),
720 member_list[i].nodeid, member_list[i].pid);
721
722 /* If the caller left auto-reaping enabled, this will also update the
723 * state to member.
724 */
725 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
727
728 if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
729 /* The node is a CPG member, but we currently think it's not a
730 * cluster member. This is possible only if auto-reaping was
731 * disabled. The node may be joining, and we happened to get the CPG
732 * notification before the quorum notification; or the node may have
733 * just died, and we are processing its final messages; or a bug
734 * has affected the peer cache.
735 */
736 time_t now = time(NULL);
737
738 if (peer->when_lost == 0) {
739 // Track when we first got into this contradictory state
740 peer->when_lost = now;
741
742 } else if (now > (peer->when_lost + 60)) {
743 // If it persists for more than a minute, update the state
744 crm_warn("Node %u is member of group %s but was believed offline",
745 member_list[i].nodeid, groupName->value);
746 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
747 }
748 }
749
750 if (local_nodeid == member_list[i].nodeid) {
751 found = TRUE;
752 }
753 }
754
755 if (!found) {
756 crm_err("Local node was evicted from group %s", groupName->value);
757 cpg_evicted = true;
758 }
759
760 counter++;
761}
762
770gboolean
772{
773 cs_error_t rc;
774 int fd = -1;
775 int retries = 0;
776 uint32_t id = 0;
777 crm_node_t *peer = NULL;
778 cpg_handle_t handle = 0;
779 const char *message_name = pcmk__message_name(crm_system_name);
780 uid_t found_uid = 0;
781 gid_t found_gid = 0;
782 pid_t found_pid = 0;
783 int rv;
784
785 struct mainloop_fd_callbacks cpg_fd_callbacks = {
786 .dispatch = pcmk_cpg_dispatch,
787 .destroy = cluster->destroy,
788 };
789
790 cpg_model_v1_data_t cpg_model_info = {
791 .model = CPG_MODEL_V1,
792 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
793 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
794 .cpg_totem_confchg_fn = NULL,
795 .flags = 0,
796 };
797
798 cpg_evicted = false;
799 cluster->group.length = 0;
800 cluster->group.value[0] = 0;
801
802 /* group.value is char[128] */
803 strncpy(cluster->group.value, message_name, 127);
804 cluster->group.value[127] = 0;
805 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
806
807 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
808 if (rc != CS_OK) {
809 crm_err("Could not connect to the CPG API: %s (%d)",
810 cs_strerror(rc), rc);
811 goto bail;
812 }
813
814 rc = cpg_fd_get(handle, &fd);
815 if (rc != CS_OK) {
816 crm_err("Could not obtain the CPG API connection: %s (%d)",
817 cs_strerror(rc), rc);
818 goto bail;
819 }
820
821 /* CPG provider run as root (in given user namespace, anyway)? */
822 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
823 &found_uid, &found_gid))) {
824 crm_err("CPG provider is not authentic:"
825 " process %lld (uid: %lld, gid: %lld)",
826 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
827 (long long) found_uid, (long long) found_gid);
828 rc = CS_ERR_ACCESS;
829 goto bail;
830 } else if (rv < 0) {
831 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
832 strerror(-rv), -rv);
833 rc = CS_ERR_ACCESS;
834 goto bail;
835 }
836
837 id = get_local_nodeid(handle);
838 if (id == 0) {
839 crm_err("Could not get local node id from the CPG API");
840 goto bail;
841
842 }
843 cluster->nodeid = id;
844
845 retries = 0;
846 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
847 if (rc != CS_OK) {
848 crm_err("Could not join the CPG group '%s': %d", message_name, rc);
849 goto bail;
850 }
851
852 pcmk_cpg_handle = handle;
853 cluster->cpg_handle = handle;
854 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
855
856 bail:
857 if (rc != CS_OK) {
858 cpg_finalize(handle);
859 return FALSE;
860 }
861
862 peer = crm_get_peer(id, NULL);
864 return TRUE;
865}
866
877gboolean
878pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
879{
880 gboolean rc = TRUE;
881 char *data = NULL;
882
884 rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
885 free(data);
886 return rc;
887}
888
901gboolean
902send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
903 gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
904{
905 static int msg_id = 0;
906 static int local_pid = 0;
907 static int local_name_len = 0;
908 static const char *local_name = NULL;
909
910 char *target = NULL;
911 struct iovec *iov;
912 pcmk__cpg_msg_t *msg = NULL;
914
915 switch (msg_class) {
917 break;
918 default:
919 crm_err("Invalid message class: %d", msg_class);
920 return FALSE;
921 }
922
923 CRM_CHECK(dest != crm_msg_ais, return FALSE);
924
925 if (local_name == NULL) {
926 local_name = get_local_node_name();
927 }
928 if ((local_name_len == 0) && (local_name != NULL)) {
929 local_name_len = strlen(local_name);
930 }
931
932 if (data == NULL) {
933 data = "";
934 }
935
936 if (local_pid == 0) {
937 local_pid = getpid();
938 }
939
940 if (sender == crm_msg_none) {
941 sender = local_pid;
942 }
943
944 msg = calloc(1, sizeof(pcmk__cpg_msg_t));
945
946 msg_id++;
947 msg->id = msg_id;
948 msg->header.id = msg_class;
949 msg->header.error = CS_OK;
950
951 msg->host.type = dest;
952 msg->host.local = local;
953
954 if (node) {
955 if (node->uname) {
956 target = strdup(node->uname);
957 msg->host.size = strlen(node->uname);
958 memset(msg->host.uname, 0, MAX_NAME);
959 memcpy(msg->host.uname, node->uname, msg->host.size);
960 } else {
961 target = crm_strdup_printf("%u", node->id);
962 }
963 msg->host.id = node->id;
964 } else {
965 target = strdup("all");
966 }
967
968 msg->sender.id = 0;
969 msg->sender.type = sender;
970 msg->sender.pid = local_pid;
971 msg->sender.size = local_name_len;
972 memset(msg->sender.uname, 0, MAX_NAME);
973 if ((local_name != NULL) && (msg->sender.size != 0)) {
974 memcpy(msg->sender.uname, local_name, msg->sender.size);
975 }
976
977 msg->size = 1 + strlen(data);
978 msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
979
980 if (msg->size < CRM_BZ2_THRESHOLD) {
981 msg = pcmk__realloc(msg, msg->header.size);
982 memcpy(msg->data, data, msg->size);
983
984 } else {
985 char *compressed = NULL;
986 unsigned int new_size = 0;
987 char *uncompressed = strdup(data);
988
989 if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
990 &compressed, &new_size) == pcmk_rc_ok) {
991
992 msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
993 msg = pcmk__realloc(msg, msg->header.size);
994 memcpy(msg->data, compressed, new_size);
995
996 msg->is_compressed = TRUE;
997 msg->compressed_size = new_size;
998
999 } else {
1000 // cppcheck seems not to understand the abort logic in pcmk__realloc
1001 // cppcheck-suppress memleak
1002 msg = pcmk__realloc(msg, msg->header.size);
1003 memcpy(msg->data, data, msg->size);
1004 }
1005
1006 free(uncompressed);
1007 free(compressed);
1008 }
1009
1010 iov = calloc(1, sizeof(struct iovec));
1011 iov->iov_base = msg;
1012 iov->iov_len = msg->header.size;
1013
1014 if (msg->compressed_size) {
1015 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1016 msg->id, target, (unsigned long long) iov->iov_len,
1017 msg->compressed_size, data);
1018 } else {
1019 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1020 msg->id, target, (unsigned long long) iov->iov_len,
1021 msg->size, data);
1022 }
1023 free(target);
1024
1025 cs_message_queue = g_list_append(cs_message_queue, iov);
1026 crm_cs_flush(&pcmk_cpg_handle);
1027
1028 return TRUE;
1029}
1030
1039text2msg_type(const char *text)
1040{
1041 int type = crm_msg_none;
1042
1043 CRM_CHECK(text != NULL, return type);
1044 text = pcmk__message_name(text);
1045 if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1046 type = crm_msg_ais;
1047 } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1048 type = crm_msg_cib;
1049 } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1051 } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1052 type = crm_msg_te;
1053 } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1054 type = crm_msg_pe;
1055 } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1057 } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1059 } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1061 } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1063
1064 } else {
1065 /* This will normally be a transient client rather than
1066 * a cluster daemon. Set the type to the pid of the client
1067 */
1068 int scan_rc = sscanf(text, "%d", &type);
1069
1070 if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1071 /* Ensure it's sane */
1073 }
1074 }
1075 return type;
1076}
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition membership.c:876
crm_node_t * pcmk__search_cluster_node_cache(unsigned int id, const char *uname)
Definition membership.c:562
@ crm_proc_cpg
Definition internal.h:21
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
crm_ais_msg_types
Definition cluster.h:103
@ crm_msg_stonithd
Definition cluster.h:110
@ crm_msg_none
Definition cluster.h:104
@ crm_msg_cib
Definition cluster.h:107
@ crm_msg_pe
Definition cluster.h:112
@ crm_msg_attrd
Definition cluster.h:109
@ crm_msg_ais
Definition cluster.h:105
@ crm_msg_te
Definition cluster.h:111
@ crm_msg_stonith_ng
Definition cluster.h:113
@ crm_msg_crmd
Definition cluster.h:108
@ crm_msg_lrmd
Definition cluster.h:106
const char * get_local_node_name(void)
Get the local node's name.
Definition cluster.c:155
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Get a cluster node cache entry.
Definition membership.c:702
#define CRM_NODE_MEMBER
Definition cluster.h:33
crm_ais_msg_class
Definition cluster.h:99
@ crm_class_cluster
Definition cluster.h:100
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition messages.c:182
struct tcp_async_cb_data __attribute__
#define ONLINESTATUS
Definition util.h:38
#define OFFLINESTATUS
Definition util.h:39
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
uint32_t compressed_size
Definition cpg.c:8
pcmk__cpg_host_t host
Definition cpg.c:4
gboolean pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
Definition cpg.c:878
pcmk__cpg_host_t sender
Definition cpg.c:5
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Handle a CPG configuration change event.
Definition cpg.c:629
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Disconnect from Corosync CPG.
Definition cpg.c:95
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
Definition cpg.c:72
#define CS_SEND_MAX
Definition cpg.c:202
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition cpg.c:902
#define msg_data_len(msg)
Definition cpg.c:76
enum crm_ais_msg_types type
Definition cpg.c:3
char uname[MAX_NAME]
Definition cpg.c:5
char data[0]
Definition cpg.c:10
uint32_t get_local_nodeid(cpg_handle_t handle)
Get the local Corosync node ID (via CPG)
Definition cpg.c:117
uint32_t size
Definition cpg.c:4
uint32_t id
Definition cpg.c:0
gboolean is_compressed
Definition cpg.c:2
enum crm_ais_msg_types text2msg_type(const char *text)
Get the message type equivalent of a string.
Definition cpg.c:1039
gboolean local
Definition cpg.c:2
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Extract text data from a Corosync CPG message.
Definition cpg.c:444
#define cs_repeat(rc, counter, max, code)
Definition cpg.c:78
uint32_t pid
Definition cpg.c:1
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Connect to Corosync CPG.
Definition cpg.c:771
struct pcmk__cpg_host_s pcmk__cpg_host_t
Definition cpg.c:55
#define CRM_SYSTEM_CIB
Definition crm.h:105
#define CRM_SYSTEM_CRMD
Definition crm.h:106
#define CRM_SYSTEM_DC
Definition crm.h:102
#define CRM_SYSTEM_STONITHD
Definition crm.h:110
#define CRM_SYSTEM_LRMD
Definition crm.h:107
#define CRM_SYSTEM_TENGINE
Definition crm.h:109
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
Definition crm.h:76
char * crm_system_name
Definition utils.c:54
#define CRM_SYSTEM_PENGINE
Definition crm.h:108
IPC interface to Pacemaker daemons.
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
#define PCMK__SPECIAL_PID_AS_0(p)
#define crm_info(fmt, args...)
Definition logging.h:353
#define crm_warn(fmt, args...)
Definition logging.h:351
#define CRM_XS
Definition logging.h:54
#define crm_notice(fmt, args...)
Definition logging.h:352
#define CRM_CHECK(expr, failure_action)
Definition logging.h:218
#define crm_debug(fmt, args...)
Definition logging.h:355
#define crm_err(fmt, args...)
Definition logging.h:350
#define crm_trace(fmt, args...)
Definition logging.h:356
Wrappers for and extensions to glib mainloop.
#define G_PRIORITY_MEDIUM
Definition mainloop.h:181
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition mainloop.c:975
int rc
Definition pcmk_fence.c:35
const char * target
Definition pcmk_fence.c:29
char * strerror(int errnum)
const char * bz2_strerror(int rc)
Definition results.c:726
#define CRM_ASSERT(expr)
Definition results.h:42
@ pcmk_rc_ok
Definition results.h:142
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition strings.c:748
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
Definition strings.c:955
@ pcmk__str_casei
uint32_t nodeid
Definition cluster.h:80
void(* destroy)(gpointer)
Definition cluster.h:82
char * uname
Definition cluster.h:53
uint32_t id
Definition cluster.h:66
char * state
Definition cluster.h:55
time_t when_lost
Definition cluster.h:67
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
Definition mainloop.h:137
char * dump_xml_unformatted(xmlNode *msg)
Definition xml.c:2017
#define CRM_BZ2_THRESHOLD
Definition xml.h:47