rcom.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691
  1. // SPDX-License-Identifier: GPL-2.0-only
  2. /******************************************************************************
  3. *******************************************************************************
  4. **
  5. ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
  6. ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
  7. **
  8. **
  9. *******************************************************************************
  10. ******************************************************************************/
  11. #include "dlm_internal.h"
  12. #include "lockspace.h"
  13. #include "member.h"
  14. #include "lowcomms.h"
  15. #include "midcomms.h"
  16. #include "rcom.h"
  17. #include "recover.h"
  18. #include "dir.h"
  19. #include "config.h"
  20. #include "memory.h"
  21. #include "lock.h"
  22. #include "util.h"
  23. static int rcom_response(struct dlm_ls *ls)
  24. {
  25. return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
  26. }
  27. static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  28. struct dlm_rcom **rc_ret, char *mb, int mb_len,
  29. uint64_t seq)
  30. {
  31. struct dlm_rcom *rc;
  32. rc = (struct dlm_rcom *) mb;
  33. rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  34. rc->rc_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
  35. rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
  36. rc->rc_header.h_length = cpu_to_le16(mb_len);
  37. rc->rc_header.h_cmd = DLM_RCOM;
  38. rc->rc_type = cpu_to_le32(type);
  39. rc->rc_seq = cpu_to_le64(seq);
  40. *rc_ret = rc;
  41. }
  42. static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  43. struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret,
  44. uint64_t seq)
  45. {
  46. int mb_len = sizeof(struct dlm_rcom) + len;
  47. struct dlm_mhandle *mh;
  48. char *mb;
  49. mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
  50. if (!mh) {
  51. log_print("%s to %d type %d len %d ENOBUFS",
  52. __func__, to_nodeid, type, len);
  53. return -ENOBUFS;
  54. }
  55. _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
  56. *mh_ret = mh;
  57. return 0;
  58. }
  59. static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
  60. int len, struct dlm_rcom **rc_ret,
  61. struct dlm_msg **msg_ret, uint64_t seq)
  62. {
  63. int mb_len = sizeof(struct dlm_rcom) + len;
  64. struct dlm_msg *msg;
  65. char *mb;
  66. msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL);
  67. if (!msg) {
  68. log_print("create_rcom to %d type %d len %d ENOBUFS",
  69. to_nodeid, type, len);
  70. return -ENOBUFS;
  71. }
  72. _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
  73. *msg_ret = msg;
  74. return 0;
  75. }
  76. static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
  77. {
  78. dlm_midcomms_commit_mhandle(mh, NULL, 0);
  79. }
  80. static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
  81. {
  82. dlm_lowcomms_commit_msg(msg);
  83. dlm_lowcomms_put_msg(msg);
  84. }
  85. static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
  86. uint32_t flags)
  87. {
  88. rs->rs_flags = cpu_to_le32(flags);
  89. }
  90. /* When replying to a status request, a node also sends back its
  91. configuration values. The requesting node then checks that the remote
  92. node is configured the same way as itself. */
  93. static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
  94. uint32_t num_slots)
  95. {
  96. rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
  97. rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
  98. rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
  99. rf->rf_num_slots = cpu_to_le16(num_slots);
  100. rf->rf_generation = cpu_to_le32(ls->ls_generation);
  101. }
  102. static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
  103. {
  104. struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
  105. if ((le32_to_cpu(rc->rc_header.h_version) & 0xFFFF0000) != DLM_HEADER_MAJOR) {
  106. log_error(ls, "version mismatch: %x nodeid %d: %x",
  107. DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
  108. le32_to_cpu(rc->rc_header.h_version));
  109. return -EPROTO;
  110. }
  111. if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
  112. le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
  113. log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
  114. ls->ls_lvblen, ls->ls_exflags, nodeid,
  115. le32_to_cpu(rf->rf_lvblen),
  116. le32_to_cpu(rf->rf_lsflags));
  117. return -EPROTO;
  118. }
  119. return 0;
  120. }
  121. static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
  122. {
  123. spin_lock_bh(&ls->ls_rcom_spin);
  124. *new_seq = cpu_to_le64(++ls->ls_rcom_seq);
  125. set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
  126. spin_unlock_bh(&ls->ls_rcom_spin);
  127. }
  128. static void disallow_sync_reply(struct dlm_ls *ls)
  129. {
  130. spin_lock_bh(&ls->ls_rcom_spin);
  131. clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
  132. clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
  133. spin_unlock_bh(&ls->ls_rcom_spin);
  134. }
  135. /*
  136. * low nodeid gathers one slot value at a time from each node.
  137. * it sets need_slots=0, and saves rf_our_slot returned from each
  138. * rcom_config.
  139. *
  140. * other nodes gather all slot values at once from the low nodeid.
  141. * they set need_slots=1, and ignore the rf_our_slot returned from each
  142. * rcom_config. they use the rf_num_slots returned from the low
  143. * node's rcom_config.
  144. */
  145. int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
  146. uint64_t seq)
  147. {
  148. struct dlm_rcom *rc;
  149. struct dlm_msg *msg;
  150. int error = 0;
  151. ls->ls_recover_nodeid = nodeid;
  152. if (nodeid == dlm_our_nodeid()) {
  153. rc = ls->ls_recover_buf;
  154. rc->rc_result = cpu_to_le32(dlm_recover_status(ls));
  155. goto out;
  156. }
  157. retry:
  158. error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
  159. sizeof(struct rcom_status), &rc, &msg,
  160. seq);
  161. if (error)
  162. goto out;
  163. set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
  164. allow_sync_reply(ls, &rc->rc_id);
  165. memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
  166. send_rcom_stateless(msg, rc);
  167. error = dlm_wait_function(ls, &rcom_response);
  168. disallow_sync_reply(ls);
  169. if (error == -ETIMEDOUT)
  170. goto retry;
  171. if (error)
  172. goto out;
  173. rc = ls->ls_recover_buf;
  174. if (rc->rc_result == cpu_to_le32(-ESRCH)) {
  175. /* we pretend the remote lockspace exists with 0 status */
  176. log_debug(ls, "remote node %d not ready", nodeid);
  177. rc->rc_result = 0;
  178. error = 0;
  179. } else {
  180. error = check_rcom_config(ls, rc, nodeid);
  181. }
  182. /* the caller looks at rc_result for the remote recovery status */
  183. out:
  184. return error;
  185. }
  186. static void receive_rcom_status(struct dlm_ls *ls,
  187. const struct dlm_rcom *rc_in,
  188. uint64_t seq)
  189. {
  190. struct dlm_rcom *rc;
  191. struct rcom_status *rs;
  192. uint32_t status;
  193. int nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
  194. int len = sizeof(struct rcom_config);
  195. struct dlm_msg *msg;
  196. int num_slots = 0;
  197. int error;
  198. if (!dlm_slots_version(&rc_in->rc_header)) {
  199. status = dlm_recover_status(ls);
  200. goto do_create;
  201. }
  202. rs = (struct rcom_status *)rc_in->rc_buf;
  203. if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
  204. status = dlm_recover_status(ls);
  205. goto do_create;
  206. }
  207. spin_lock_bh(&ls->ls_recover_lock);
  208. status = ls->ls_recover_status;
  209. num_slots = ls->ls_num_slots;
  210. spin_unlock_bh(&ls->ls_recover_lock);
  211. len += num_slots * sizeof(struct rcom_slot);
  212. do_create:
  213. error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
  214. len, &rc, &msg, seq);
  215. if (error)
  216. return;
  217. rc->rc_id = rc_in->rc_id;
  218. rc->rc_seq_reply = rc_in->rc_seq;
  219. rc->rc_result = cpu_to_le32(status);
  220. set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
  221. if (!num_slots)
  222. goto do_send;
  223. spin_lock_bh(&ls->ls_recover_lock);
  224. if (ls->ls_num_slots != num_slots) {
  225. spin_unlock_bh(&ls->ls_recover_lock);
  226. log_debug(ls, "receive_rcom_status num_slots %d to %d",
  227. num_slots, ls->ls_num_slots);
  228. rc->rc_result = 0;
  229. set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
  230. goto do_send;
  231. }
  232. dlm_slots_copy_out(ls, rc);
  233. spin_unlock_bh(&ls->ls_recover_lock);
  234. do_send:
  235. send_rcom_stateless(msg, rc);
  236. }
  237. static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
  238. {
  239. spin_lock_bh(&ls->ls_rcom_spin);
  240. if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
  241. le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
  242. log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
  243. le32_to_cpu(rc_in->rc_type),
  244. le32_to_cpu(rc_in->rc_header.h_nodeid),
  245. (unsigned long long)le64_to_cpu(rc_in->rc_id),
  246. (unsigned long long)ls->ls_rcom_seq);
  247. goto out;
  248. }
  249. memcpy(ls->ls_recover_buf, rc_in,
  250. le16_to_cpu(rc_in->rc_header.h_length));
  251. set_bit(LSFL_RCOM_READY, &ls->ls_flags);
  252. clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
  253. wake_up(&ls->ls_wait_general);
  254. out:
  255. spin_unlock_bh(&ls->ls_rcom_spin);
  256. }
  257. int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
  258. int last_len, uint64_t seq)
  259. {
  260. struct dlm_mhandle *mh;
  261. struct dlm_rcom *rc;
  262. int error = 0;
  263. ls->ls_recover_nodeid = nodeid;
  264. retry:
  265. error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len,
  266. &rc, &mh, seq);
  267. if (error)
  268. goto out;
  269. memcpy(rc->rc_buf, last_name, last_len);
  270. allow_sync_reply(ls, &rc->rc_id);
  271. memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
  272. send_rcom(mh, rc);
  273. error = dlm_wait_function(ls, &rcom_response);
  274. disallow_sync_reply(ls);
  275. if (error == -ETIMEDOUT)
  276. goto retry;
  277. out:
  278. return error;
  279. }
  280. static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
  281. uint64_t seq)
  282. {
  283. struct dlm_mhandle *mh;
  284. struct dlm_rcom *rc;
  285. int error, inlen, outlen, nodeid;
  286. nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
  287. inlen = le16_to_cpu(rc_in->rc_header.h_length) -
  288. sizeof(struct dlm_rcom);
  289. outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
  290. error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
  291. &rc, &mh, seq);
  292. if (error)
  293. return;
  294. rc->rc_id = rc_in->rc_id;
  295. rc->rc_seq_reply = rc_in->rc_seq;
  296. dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
  297. nodeid);
  298. send_rcom(mh, rc);
  299. }
  300. int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq)
  301. {
  302. struct dlm_rcom *rc;
  303. struct dlm_mhandle *mh;
  304. struct dlm_ls *ls = r->res_ls;
  305. int error;
  306. error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
  307. &rc, &mh, seq);
  308. if (error)
  309. goto out;
  310. memcpy(rc->rc_buf, r->res_name, r->res_length);
  311. rc->rc_id = cpu_to_le64(r->res_id);
  312. send_rcom(mh, rc);
  313. out:
  314. return error;
  315. }
  316. static void receive_rcom_lookup(struct dlm_ls *ls,
  317. const struct dlm_rcom *rc_in, uint64_t seq)
  318. {
  319. struct dlm_rcom *rc;
  320. struct dlm_mhandle *mh;
  321. int error, ret_nodeid, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
  322. int len = le16_to_cpu(rc_in->rc_header.h_length) -
  323. sizeof(struct dlm_rcom);
  324. /* Old code would send this special id to trigger a debug dump. */
  325. if (rc_in->rc_id == cpu_to_le64(0xFFFFFFFF)) {
  326. log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
  327. dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
  328. return;
  329. }
  330. error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh,
  331. seq);
  332. if (error)
  333. return;
  334. error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
  335. DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
  336. if (error)
  337. ret_nodeid = error;
  338. rc->rc_result = cpu_to_le32(ret_nodeid);
  339. rc->rc_id = rc_in->rc_id;
  340. rc->rc_seq_reply = rc_in->rc_seq;
  341. send_rcom(mh, rc);
  342. }
  343. static void receive_rcom_lookup_reply(struct dlm_ls *ls,
  344. const struct dlm_rcom *rc_in)
  345. {
  346. dlm_recover_master_reply(ls, rc_in);
  347. }
  348. static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
  349. struct rcom_lock *rl)
  350. {
  351. memset(rl, 0, sizeof(*rl));
  352. rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
  353. rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
  354. rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
  355. rl->rl_flags = cpu_to_le32(dlm_dflags_val(lkb));
  356. rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
  357. rl->rl_rqmode = lkb->lkb_rqmode;
  358. rl->rl_grmode = lkb->lkb_grmode;
  359. rl->rl_status = lkb->lkb_status;
  360. rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
  361. if (lkb->lkb_bastfn)
  362. rl->rl_asts |= DLM_CB_BAST;
  363. if (lkb->lkb_astfn)
  364. rl->rl_asts |= DLM_CB_CAST;
  365. rl->rl_namelen = cpu_to_le16(r->res_length);
  366. memcpy(rl->rl_name, r->res_name, r->res_length);
  367. /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
  368. If so, receive_rcom_lock_args() won't take this copy. */
  369. if (lkb->lkb_lvbptr)
  370. memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
  371. }
  372. int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
  373. {
  374. struct dlm_ls *ls = r->res_ls;
  375. struct dlm_rcom *rc;
  376. struct dlm_mhandle *mh;
  377. struct rcom_lock *rl;
  378. int error, len = sizeof(struct rcom_lock);
  379. if (lkb->lkb_lvbptr)
  380. len += ls->ls_lvblen;
  381. error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
  382. seq);
  383. if (error)
  384. goto out;
  385. rl = (struct rcom_lock *) rc->rc_buf;
  386. pack_rcom_lock(r, lkb, rl);
  387. rc->rc_id = cpu_to_le64((uintptr_t)r);
  388. send_rcom(mh, rc);
  389. out:
  390. return error;
  391. }
  392. /* needs at least dlm_rcom + rcom_lock */
  393. static void receive_rcom_lock(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
  394. uint64_t seq)
  395. {
  396. __le32 rl_remid, rl_result;
  397. struct rcom_lock *rl;
  398. struct dlm_rcom *rc;
  399. struct dlm_mhandle *mh;
  400. int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
  401. dlm_recover_master_copy(ls, rc_in, &rl_remid, &rl_result);
  402. error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
  403. sizeof(struct rcom_lock), &rc, &mh, seq);
  404. if (error)
  405. return;
  406. memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
  407. rl = (struct rcom_lock *)rc->rc_buf;
  408. /* set rl_remid and rl_result from dlm_recover_master_copy() */
  409. rl->rl_remid = rl_remid;
  410. rl->rl_result = rl_result;
  411. rc->rc_id = rc_in->rc_id;
  412. rc->rc_seq_reply = rc_in->rc_seq;
  413. send_rcom(mh, rc);
  414. }
  415. /* If the lockspace doesn't exist then still send a status message
  416. back; it's possible that it just doesn't have its global_id yet. */
  417. int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
  418. {
  419. struct dlm_rcom *rc;
  420. struct rcom_config *rf;
  421. struct dlm_mhandle *mh;
  422. char *mb;
  423. int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
  424. mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb);
  425. if (!mh)
  426. return -ENOBUFS;
  427. rc = (struct dlm_rcom *) mb;
  428. rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  429. rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
  430. rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
  431. rc->rc_header.h_length = cpu_to_le16(mb_len);
  432. rc->rc_header.h_cmd = DLM_RCOM;
  433. rc->rc_type = cpu_to_le32(DLM_RCOM_STATUS_REPLY);
  434. rc->rc_id = rc_in->rc_id;
  435. rc->rc_seq_reply = rc_in->rc_seq;
  436. rc->rc_result = cpu_to_le32(-ESRCH);
  437. rf = (struct rcom_config *) rc->rc_buf;
  438. rf->rf_lvblen = cpu_to_le32(~0U);
  439. dlm_midcomms_commit_mhandle(mh, NULL, 0);
  440. return 0;
  441. }
  442. /*
  443. * Ignore messages for stage Y before we set
  444. * recover_status bit for stage X:
  445. *
  446. * recover_status = 0
  447. *
  448. * dlm_recover_members()
  449. * - send nothing
  450. * - recv nothing
  451. * - ignore NAMES, NAMES_REPLY
  452. * - ignore LOOKUP, LOOKUP_REPLY
  453. * - ignore LOCK, LOCK_REPLY
  454. *
  455. * recover_status |= NODES
  456. *
  457. * dlm_recover_members_wait()
  458. *
  459. * dlm_recover_directory()
  460. * - send NAMES
  461. * - recv NAMES_REPLY
  462. * - ignore LOOKUP, LOOKUP_REPLY
  463. * - ignore LOCK, LOCK_REPLY
  464. *
  465. * recover_status |= DIR
  466. *
  467. * dlm_recover_directory_wait()
  468. *
  469. * dlm_recover_masters()
  470. * - send LOOKUP
  471. * - recv LOOKUP_REPLY
  472. *
  473. * dlm_recover_locks()
  474. * - send LOCKS
  475. * - recv LOCKS_REPLY
  476. *
  477. * recover_status |= LOCKS
  478. *
  479. * dlm_recover_locks_wait()
  480. *
  481. * recover_status |= DONE
  482. */
  483. /* Called by dlm_recv; corresponds to dlm_receive_message() but special
  484. recovery-only comms are sent through here. */
  485. void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
  486. {
  487. int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
  488. int stop, reply = 0, names = 0, lookup = 0, lock = 0;
  489. uint32_t status;
  490. uint64_t seq;
  491. switch (rc->rc_type) {
  492. case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
  493. reply = 1;
  494. break;
  495. case cpu_to_le32(DLM_RCOM_NAMES):
  496. names = 1;
  497. break;
  498. case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
  499. names = 1;
  500. reply = 1;
  501. break;
  502. case cpu_to_le32(DLM_RCOM_LOOKUP):
  503. lookup = 1;
  504. break;
  505. case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
  506. lookup = 1;
  507. reply = 1;
  508. break;
  509. case cpu_to_le32(DLM_RCOM_LOCK):
  510. lock = 1;
  511. break;
  512. case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
  513. lock = 1;
  514. reply = 1;
  515. break;
  516. }
  517. spin_lock_bh(&ls->ls_recover_lock);
  518. status = ls->ls_recover_status;
  519. stop = dlm_recovery_stopped(ls);
  520. seq = ls->ls_recover_seq;
  521. spin_unlock_bh(&ls->ls_recover_lock);
  522. if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
  523. goto ignore;
  524. if (reply && (le64_to_cpu(rc->rc_seq_reply) != seq))
  525. goto ignore;
  526. if (!(status & DLM_RS_NODES) && (names || lookup || lock))
  527. goto ignore;
  528. if (!(status & DLM_RS_DIR) && (lookup || lock))
  529. goto ignore;
  530. switch (rc->rc_type) {
  531. case cpu_to_le32(DLM_RCOM_STATUS):
  532. receive_rcom_status(ls, rc, seq);
  533. break;
  534. case cpu_to_le32(DLM_RCOM_NAMES):
  535. receive_rcom_names(ls, rc, seq);
  536. break;
  537. case cpu_to_le32(DLM_RCOM_LOOKUP):
  538. receive_rcom_lookup(ls, rc, seq);
  539. break;
  540. case cpu_to_le32(DLM_RCOM_LOCK):
  541. if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
  542. goto Eshort;
  543. receive_rcom_lock(ls, rc, seq);
  544. break;
  545. case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
  546. receive_sync_reply(ls, rc);
  547. break;
  548. case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
  549. receive_sync_reply(ls, rc);
  550. break;
  551. case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
  552. receive_rcom_lookup_reply(ls, rc);
  553. break;
  554. case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
  555. if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
  556. goto Eshort;
  557. dlm_recover_process_copy(ls, rc, seq);
  558. break;
  559. default:
  560. log_error(ls, "receive_rcom bad type %d",
  561. le32_to_cpu(rc->rc_type));
  562. }
  563. return;
  564. ignore:
  565. log_limit(ls, "dlm_receive_rcom ignore msg %d "
  566. "from %d %llu %llu recover seq %llu sts %x gen %u",
  567. le32_to_cpu(rc->rc_type),
  568. nodeid,
  569. (unsigned long long)le64_to_cpu(rc->rc_seq),
  570. (unsigned long long)le64_to_cpu(rc->rc_seq_reply),
  571. (unsigned long long)seq,
  572. status, ls->ls_generation);
  573. return;
  574. Eshort:
  575. log_error(ls, "recovery message %d from %d is too short",
  576. le32_to_cpu(rc->rc_type), nodeid);
  577. }