requestqueue.c 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. // SPDX-License-Identifier: GPL-2.0-only
  2. /******************************************************************************
  3. *******************************************************************************
  4. **
  5. ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
  6. **
  7. **
  8. *******************************************************************************
  9. ******************************************************************************/
  10. #include "dlm_internal.h"
  11. #include "member.h"
  12. #include "lock.h"
  13. #include "dir.h"
  14. #include "config.h"
  15. #include "requestqueue.h"
  16. #include "util.h"
  17. struct rq_entry {
  18. struct list_head list;
  19. uint32_t recover_seq;
  20. int nodeid;
  21. struct dlm_message request;
  22. };
  23. /*
  24. * Requests received while the lockspace is in recovery get added to the
  25. * request queue and processed when recovery is complete. This happens when
  26. * the lockspace is suspended on some nodes before it is on others, or the
  27. * lockspace is enabled on some while still suspended on others.
  28. */
  29. void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
  30. const struct dlm_message *ms)
  31. {
  32. struct rq_entry *e;
  33. int length = le16_to_cpu(ms->m_header.h_length) -
  34. sizeof(struct dlm_message);
  35. e = kmalloc(sizeof(struct rq_entry) + length, GFP_ATOMIC);
  36. if (!e) {
  37. log_print("dlm_add_requestqueue: out of memory len %d", length);
  38. return;
  39. }
  40. e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
  41. e->nodeid = nodeid;
  42. memcpy(&e->request, ms, sizeof(*ms));
  43. memcpy(&e->request.m_extra, ms->m_extra, length);
  44. list_add_tail(&e->list, &ls->ls_requestqueue);
  45. }
  46. /*
  47. * Called by dlm_recoverd to process normal messages saved while recovery was
  48. * happening. Normal locking has been enabled before this is called. dlm_recv
  49. * upon receiving a message, will wait for all saved messages to be drained
  50. * here before processing the message it got. If a new dlm_ls_stop() arrives
  51. * while we're processing these saved messages, it may block trying to suspend
  52. * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that
  53. * case, we don't abort since locking_stopped is still 0. If dlm_recv is not
  54. * waiting for us, then this processing may be aborted due to locking_stopped.
  55. */
  56. int dlm_process_requestqueue(struct dlm_ls *ls)
  57. {
  58. struct rq_entry *e;
  59. struct dlm_message *ms;
  60. int error = 0;
  61. write_lock_bh(&ls->ls_requestqueue_lock);
  62. for (;;) {
  63. if (list_empty(&ls->ls_requestqueue)) {
  64. clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
  65. error = 0;
  66. break;
  67. }
  68. e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list);
  69. ms = &e->request;
  70. log_limit(ls, "dlm_process_requestqueue msg %d from %d "
  71. "lkid %x remid %x result %d seq %u",
  72. le32_to_cpu(ms->m_type),
  73. le32_to_cpu(ms->m_header.h_nodeid),
  74. le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
  75. from_dlm_errno(le32_to_cpu(ms->m_result)),
  76. e->recover_seq);
  77. dlm_receive_message_saved(ls, &e->request, e->recover_seq);
  78. list_del(&e->list);
  79. kfree(e);
  80. if (dlm_locking_stopped(ls)) {
  81. log_debug(ls, "process_requestqueue abort running");
  82. error = -EINTR;
  83. break;
  84. }
  85. write_unlock_bh(&ls->ls_requestqueue_lock);
  86. schedule();
  87. write_lock_bh(&ls->ls_requestqueue_lock);
  88. }
  89. write_unlock_bh(&ls->ls_requestqueue_lock);
  90. return error;
  91. }
  92. static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
  93. {
  94. __le32 type = ms->m_type;
  95. /* the ls is being cleaned up and freed by release_lockspace */
  96. if (!atomic_read(&ls->ls_count))
  97. return 1;
  98. if (dlm_is_removed(ls, nodeid))
  99. return 1;
  100. /* directory operations are always purged because the directory is
  101. always rebuilt during recovery and the lookups resent */
  102. if (type == cpu_to_le32(DLM_MSG_REMOVE) ||
  103. type == cpu_to_le32(DLM_MSG_LOOKUP) ||
  104. type == cpu_to_le32(DLM_MSG_LOOKUP_REPLY))
  105. return 1;
  106. if (!dlm_no_directory(ls))
  107. return 0;
  108. return 1;
  109. }
  110. void dlm_purge_requestqueue(struct dlm_ls *ls)
  111. {
  112. struct dlm_message *ms;
  113. struct rq_entry *e, *safe;
  114. write_lock_bh(&ls->ls_requestqueue_lock);
  115. list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
  116. ms = &e->request;
  117. if (purge_request(ls, ms, e->nodeid)) {
  118. list_del(&e->list);
  119. kfree(e);
  120. }
  121. }
  122. write_unlock_bh(&ls->ls_requestqueue_lock);
  123. }