messenger.c 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221
  1. // SPDX-License-Identifier: GPL-2.0
  2. #include <linux/ceph/ceph_debug.h>
  3. #include <linux/crc32c.h>
  4. #include <linux/ctype.h>
  5. #include <linux/highmem.h>
  6. #include <linux/inet.h>
  7. #include <linux/kthread.h>
  8. #include <linux/net.h>
  9. #include <linux/nsproxy.h>
  10. #include <linux/sched/mm.h>
  11. #include <linux/slab.h>
  12. #include <linux/socket.h>
  13. #include <linux/string.h>
  14. #ifdef CONFIG_BLOCK
  15. #include <linux/bio.h>
  16. #endif /* CONFIG_BLOCK */
  17. #include <linux/dns_resolver.h>
  18. #include <net/tcp.h>
  19. #include <trace/events/sock.h>
  20. #include <linux/ceph/ceph_features.h>
  21. #include <linux/ceph/libceph.h>
  22. #include <linux/ceph/messenger.h>
  23. #include <linux/ceph/decode.h>
  24. #include <linux/ceph/pagelist.h>
  25. #include <linux/export.h>
  26. /*
  27. * Ceph uses the messenger to exchange ceph_msg messages with other
  28. * hosts in the system. The messenger provides ordered and reliable
  29. * delivery. We tolerate TCP disconnects by reconnecting (with
  30. * exponential backoff) in the case of a fault (disconnection, bad
  31. * crc, protocol error). Acks allow sent messages to be discarded by
  32. * the sender.
  33. */
  34. /*
  35. * We track the state of the socket on a given connection using
  36. * values defined below. The transition to a new socket state is
  37. * handled by a function which verifies we aren't coming from an
  38. * unexpected state.
  39. *
  40. * --------
  41. * | NEW* | transient initial state
  42. * --------
  43. * | con_sock_state_init()
  44. * v
  45. * ----------
  46. * | CLOSED | initialized, but no socket (and no
  47. * ---------- TCP connection)
  48. * ^ \
  49. * | \ con_sock_state_connecting()
  50. * | ----------------------
  51. * | \
  52. * + con_sock_state_closed() \
  53. * |+--------------------------- \
  54. * | \ \ \
  55. * | ----------- \ \
  56. * | | CLOSING | socket event; \ \
  57. * | ----------- await close \ \
  58. * | ^ \ |
  59. * | | \ |
  60. * | + con_sock_state_closing() \ |
  61. * | / \ | |
  62. * | / --------------- | |
  63. * | / \ v v
  64. * | / --------------
  65. * | / -----------------| CONNECTING | socket created, TCP
  66. * | | / -------------- connect initiated
  67. * | | | con_sock_state_connected()
  68. * | | v
  69. * -------------
  70. * | CONNECTED | TCP connection established
  71. * -------------
  72. *
  73. * State values for ceph_connection->sock_state; NEW is assumed to be 0.
  74. */
  75. #define CON_SOCK_STATE_NEW 0 /* -> CLOSED */
  76. #define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */
  77. #define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */
  78. #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
  79. #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */
  80. static bool con_flag_valid(unsigned long con_flag)
  81. {
  82. switch (con_flag) {
  83. case CEPH_CON_F_LOSSYTX:
  84. case CEPH_CON_F_KEEPALIVE_PENDING:
  85. case CEPH_CON_F_WRITE_PENDING:
  86. case CEPH_CON_F_SOCK_CLOSED:
  87. case CEPH_CON_F_BACKOFF:
  88. return true;
  89. default:
  90. return false;
  91. }
  92. }
  93. void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
  94. {
  95. BUG_ON(!con_flag_valid(con_flag));
  96. clear_bit(con_flag, &con->flags);
  97. }
  98. void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag)
  99. {
  100. BUG_ON(!con_flag_valid(con_flag));
  101. set_bit(con_flag, &con->flags);
  102. }
  103. bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag)
  104. {
  105. BUG_ON(!con_flag_valid(con_flag));
  106. return test_bit(con_flag, &con->flags);
  107. }
  108. bool ceph_con_flag_test_and_clear(struct ceph_connection *con,
  109. unsigned long con_flag)
  110. {
  111. BUG_ON(!con_flag_valid(con_flag));
  112. return test_and_clear_bit(con_flag, &con->flags);
  113. }
  114. bool ceph_con_flag_test_and_set(struct ceph_connection *con,
  115. unsigned long con_flag)
  116. {
  117. BUG_ON(!con_flag_valid(con_flag));
  118. return test_and_set_bit(con_flag, &con->flags);
  119. }
  120. /* Slab caches for frequently-allocated structures */
  121. static struct kmem_cache *ceph_msg_cache;
  122. #ifdef CONFIG_LOCKDEP
  123. static struct lock_class_key socket_class;
  124. #endif
  125. static void queue_con(struct ceph_connection *con);
  126. static void cancel_con(struct ceph_connection *con);
  127. static void ceph_con_workfn(struct work_struct *);
  128. static void con_fault(struct ceph_connection *con);
  129. /*
  130. * Nicely render a sockaddr as a string. An array of formatted
  131. * strings is used, to approximate reentrancy.
  132. */
  133. #define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */
  134. #define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG)
  135. #define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1)
  136. #define MAX_ADDR_STR_LEN 64 /* 54 is enough */
  137. static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
  138. static atomic_t addr_str_seq = ATOMIC_INIT(0);
  139. struct page *ceph_zero_page; /* used in certain error cases */
  140. const char *ceph_pr_addr(const struct ceph_entity_addr *addr)
  141. {
  142. int i;
  143. char *s;
  144. struct sockaddr_storage ss = addr->in_addr; /* align */
  145. struct sockaddr_in *in4 = (struct sockaddr_in *)&ss;
  146. struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss;
  147. i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
  148. s = addr_str[i];
  149. switch (ss.ss_family) {
  150. case AF_INET:
  151. snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu",
  152. le32_to_cpu(addr->type), &in4->sin_addr,
  153. ntohs(in4->sin_port));
  154. break;
  155. case AF_INET6:
  156. snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu",
  157. le32_to_cpu(addr->type), &in6->sin6_addr,
  158. ntohs(in6->sin6_port));
  159. break;
  160. default:
  161. snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
  162. ss.ss_family);
  163. }
  164. return s;
  165. }
  166. EXPORT_SYMBOL(ceph_pr_addr);
  167. void ceph_encode_my_addr(struct ceph_messenger *msgr)
  168. {
  169. if (!ceph_msgr2(from_msgr(msgr))) {
  170. memcpy(&msgr->my_enc_addr, &msgr->inst.addr,
  171. sizeof(msgr->my_enc_addr));
  172. ceph_encode_banner_addr(&msgr->my_enc_addr);
  173. }
  174. }
  175. /*
  176. * work queue for all reading and writing to/from the socket.
  177. */
  178. static struct workqueue_struct *ceph_msgr_wq;
  179. static int ceph_msgr_slab_init(void)
  180. {
  181. BUG_ON(ceph_msg_cache);
  182. ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
  183. if (!ceph_msg_cache)
  184. return -ENOMEM;
  185. return 0;
  186. }
  187. static void ceph_msgr_slab_exit(void)
  188. {
  189. BUG_ON(!ceph_msg_cache);
  190. kmem_cache_destroy(ceph_msg_cache);
  191. ceph_msg_cache = NULL;
  192. }
  193. static void _ceph_msgr_exit(void)
  194. {
  195. if (ceph_msgr_wq) {
  196. destroy_workqueue(ceph_msgr_wq);
  197. ceph_msgr_wq = NULL;
  198. }
  199. BUG_ON(!ceph_zero_page);
  200. put_page(ceph_zero_page);
  201. ceph_zero_page = NULL;
  202. ceph_msgr_slab_exit();
  203. }
  204. int __init ceph_msgr_init(void)
  205. {
  206. if (ceph_msgr_slab_init())
  207. return -ENOMEM;
  208. BUG_ON(ceph_zero_page);
  209. ceph_zero_page = ZERO_PAGE(0);
  210. get_page(ceph_zero_page);
  211. /*
  212. * The number of active work items is limited by the number of
  213. * connections, so leave @max_active at default.
  214. */
  215. ceph_msgr_wq = alloc_workqueue("ceph-msgr",
  216. WQ_MEM_RECLAIM | WQ_PERCPU, 0);
  217. if (ceph_msgr_wq)
  218. return 0;
  219. pr_err("msgr_init failed to create workqueue\n");
  220. _ceph_msgr_exit();
  221. return -ENOMEM;
  222. }
  223. void ceph_msgr_exit(void)
  224. {
  225. BUG_ON(ceph_msgr_wq == NULL);
  226. _ceph_msgr_exit();
  227. }
  228. void ceph_msgr_flush(void)
  229. {
  230. flush_workqueue(ceph_msgr_wq);
  231. }
  232. EXPORT_SYMBOL(ceph_msgr_flush);
  233. /* Connection socket state transition functions */
  234. static void con_sock_state_init(struct ceph_connection *con)
  235. {
  236. int old_state;
  237. old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
  238. if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
  239. printk("%s: unexpected old state %d\n", __func__, old_state);
  240. dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
  241. CON_SOCK_STATE_CLOSED);
  242. }
  243. static void con_sock_state_connecting(struct ceph_connection *con)
  244. {
  245. int old_state;
  246. old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
  247. if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
  248. printk("%s: unexpected old state %d\n", __func__, old_state);
  249. dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
  250. CON_SOCK_STATE_CONNECTING);
  251. }
  252. static void con_sock_state_connected(struct ceph_connection *con)
  253. {
  254. int old_state;
  255. old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
  256. if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
  257. printk("%s: unexpected old state %d\n", __func__, old_state);
  258. dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
  259. CON_SOCK_STATE_CONNECTED);
  260. }
  261. static void con_sock_state_closing(struct ceph_connection *con)
  262. {
  263. int old_state;
  264. old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
  265. if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
  266. old_state != CON_SOCK_STATE_CONNECTED &&
  267. old_state != CON_SOCK_STATE_CLOSING))
  268. printk("%s: unexpected old state %d\n", __func__, old_state);
  269. dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
  270. CON_SOCK_STATE_CLOSING);
  271. }
  272. static void con_sock_state_closed(struct ceph_connection *con)
  273. {
  274. int old_state;
  275. old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
  276. if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
  277. old_state != CON_SOCK_STATE_CLOSING &&
  278. old_state != CON_SOCK_STATE_CONNECTING &&
  279. old_state != CON_SOCK_STATE_CLOSED))
  280. printk("%s: unexpected old state %d\n", __func__, old_state);
  281. dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
  282. CON_SOCK_STATE_CLOSED);
  283. }
  284. /*
  285. * socket callback functions
  286. */
  287. /* data available on socket, or listen socket received a connect */
  288. static void ceph_sock_data_ready(struct sock *sk)
  289. {
  290. struct ceph_connection *con = sk->sk_user_data;
  291. trace_sk_data_ready(sk);
  292. if (atomic_read(&con->msgr->stopping)) {
  293. return;
  294. }
  295. if (sk->sk_state != TCP_CLOSE_WAIT) {
  296. dout("%s %p state = %d, queueing work\n", __func__,
  297. con, con->state);
  298. queue_con(con);
  299. }
  300. }
  301. /* socket has buffer space for writing */
  302. static void ceph_sock_write_space(struct sock *sk)
  303. {
  304. struct ceph_connection *con = sk->sk_user_data;
  305. /* only queue to workqueue if there is data we want to write,
  306. * and there is sufficient space in the socket buffer to accept
  307. * more data. clear SOCK_NOSPACE so that ceph_sock_write_space()
  308. * doesn't get called again until try_write() fills the socket
  309. * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
  310. * and net/core/stream.c:sk_stream_write_space().
  311. */
  312. if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) {
  313. if (sk_stream_is_writeable(sk)) {
  314. dout("%s %p queueing write work\n", __func__, con);
  315. clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  316. queue_con(con);
  317. }
  318. } else {
  319. dout("%s %p nothing to write\n", __func__, con);
  320. }
  321. }
  322. /* socket's state has changed */
  323. static void ceph_sock_state_change(struct sock *sk)
  324. {
  325. struct ceph_connection *con = sk->sk_user_data;
  326. dout("%s %p state = %d sk_state = %u\n", __func__,
  327. con, con->state, sk->sk_state);
  328. switch (sk->sk_state) {
  329. case TCP_CLOSE:
  330. dout("%s TCP_CLOSE\n", __func__);
  331. fallthrough;
  332. case TCP_CLOSE_WAIT:
  333. dout("%s TCP_CLOSE_WAIT\n", __func__);
  334. con_sock_state_closing(con);
  335. ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED);
  336. queue_con(con);
  337. break;
  338. case TCP_ESTABLISHED:
  339. dout("%s TCP_ESTABLISHED\n", __func__);
  340. con_sock_state_connected(con);
  341. queue_con(con);
  342. break;
  343. default: /* Everything else is uninteresting */
  344. break;
  345. }
  346. }
  347. /*
  348. * set up socket callbacks
  349. */
  350. static void set_sock_callbacks(struct socket *sock,
  351. struct ceph_connection *con)
  352. {
  353. struct sock *sk = sock->sk;
  354. sk->sk_user_data = con;
  355. sk->sk_data_ready = ceph_sock_data_ready;
  356. sk->sk_write_space = ceph_sock_write_space;
  357. sk->sk_state_change = ceph_sock_state_change;
  358. }
  359. /*
  360. * socket helpers
  361. */
  362. /*
  363. * initiate connection to a remote socket.
  364. */
  365. int ceph_tcp_connect(struct ceph_connection *con)
  366. {
  367. struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */
  368. struct socket *sock;
  369. unsigned int noio_flag;
  370. int ret;
  371. dout("%s con %p peer_addr %s\n", __func__, con,
  372. ceph_pr_addr(&con->peer_addr));
  373. BUG_ON(con->sock);
  374. /* sock_create_kern() allocates with GFP_KERNEL */
  375. noio_flag = memalloc_noio_save();
  376. ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family,
  377. SOCK_STREAM, IPPROTO_TCP, &sock);
  378. memalloc_noio_restore(noio_flag);
  379. if (ret)
  380. return ret;
  381. sock->sk->sk_allocation = GFP_NOFS;
  382. sock->sk->sk_use_task_frag = false;
  383. #ifdef CONFIG_LOCKDEP
  384. lockdep_set_class(&sock->sk->sk_lock, &socket_class);
  385. #endif
  386. set_sock_callbacks(sock, con);
  387. con_sock_state_connecting(con);
  388. ret = kernel_connect(sock, (struct sockaddr_unsized *)&ss, sizeof(ss),
  389. O_NONBLOCK);
  390. if (ret == -EINPROGRESS) {
  391. dout("connect %s EINPROGRESS sk_state = %u\n",
  392. ceph_pr_addr(&con->peer_addr),
  393. sock->sk->sk_state);
  394. } else if (ret < 0) {
  395. pr_err("connect %s error %d\n",
  396. ceph_pr_addr(&con->peer_addr), ret);
  397. sock_release(sock);
  398. return ret;
  399. }
  400. if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
  401. tcp_sock_set_nodelay(sock->sk);
  402. con->sock = sock;
  403. return 0;
  404. }
  405. /*
  406. * Shutdown/close the socket for the given connection.
  407. */
  408. int ceph_con_close_socket(struct ceph_connection *con)
  409. {
  410. int rc = 0;
  411. dout("%s con %p sock %p\n", __func__, con, con->sock);
  412. if (con->sock) {
  413. rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
  414. sock_release(con->sock);
  415. con->sock = NULL;
  416. }
  417. /*
  418. * Forcibly clear the SOCK_CLOSED flag. It gets set
  419. * independent of the connection mutex, and we could have
  420. * received a socket close event before we had the chance to
  421. * shut the socket down.
  422. */
  423. ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED);
  424. con_sock_state_closed(con);
  425. return rc;
  426. }
  427. static void ceph_con_reset_protocol(struct ceph_connection *con)
  428. {
  429. dout("%s con %p\n", __func__, con);
  430. ceph_con_close_socket(con);
  431. if (con->in_msg) {
  432. WARN_ON(con->in_msg->con != con);
  433. ceph_msg_put(con->in_msg);
  434. con->in_msg = NULL;
  435. }
  436. if (con->out_msg) {
  437. WARN_ON(con->out_msg->con != con);
  438. ceph_msg_put(con->out_msg);
  439. con->out_msg = NULL;
  440. }
  441. if (con->bounce_page) {
  442. __free_page(con->bounce_page);
  443. con->bounce_page = NULL;
  444. }
  445. if (ceph_msgr2(from_msgr(con->msgr)))
  446. ceph_con_v2_reset_protocol(con);
  447. else
  448. ceph_con_v1_reset_protocol(con);
  449. }
  450. /*
  451. * Reset a connection. Discard all incoming and outgoing messages
  452. * and clear *_seq state.
  453. */
  454. static void ceph_msg_remove(struct ceph_msg *msg)
  455. {
  456. list_del_init(&msg->list_head);
  457. ceph_msg_put(msg);
  458. }
  459. static void ceph_msg_remove_list(struct list_head *head)
  460. {
  461. while (!list_empty(head)) {
  462. struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
  463. list_head);
  464. ceph_msg_remove(msg);
  465. }
  466. }
  467. void ceph_con_reset_session(struct ceph_connection *con)
  468. {
  469. dout("%s con %p\n", __func__, con);
  470. WARN_ON(con->in_msg);
  471. WARN_ON(con->out_msg);
  472. ceph_msg_remove_list(&con->out_queue);
  473. ceph_msg_remove_list(&con->out_sent);
  474. con->out_seq = 0;
  475. con->in_seq = 0;
  476. con->in_seq_acked = 0;
  477. if (ceph_msgr2(from_msgr(con->msgr)))
  478. ceph_con_v2_reset_session(con);
  479. else
  480. ceph_con_v1_reset_session(con);
  481. }
  482. /*
  483. * mark a peer down. drop any open connections.
  484. */
  485. void ceph_con_close(struct ceph_connection *con)
  486. {
  487. mutex_lock(&con->mutex);
  488. dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
  489. con->state = CEPH_CON_S_CLOSED;
  490. ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next
  491. connect */
  492. ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING);
  493. ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
  494. ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF);
  495. ceph_con_reset_protocol(con);
  496. ceph_con_reset_session(con);
  497. cancel_con(con);
  498. mutex_unlock(&con->mutex);
  499. }
  500. EXPORT_SYMBOL(ceph_con_close);
  501. /*
  502. * Reopen a closed connection, with a new peer address.
  503. */
  504. void ceph_con_open(struct ceph_connection *con,
  505. __u8 entity_type, __u64 entity_num,
  506. struct ceph_entity_addr *addr)
  507. {
  508. mutex_lock(&con->mutex);
  509. dout("con_open %p %s\n", con, ceph_pr_addr(addr));
  510. WARN_ON(con->state != CEPH_CON_S_CLOSED);
  511. con->state = CEPH_CON_S_PREOPEN;
  512. con->peer_name.type = (__u8) entity_type;
  513. con->peer_name.num = cpu_to_le64(entity_num);
  514. memcpy(&con->peer_addr, addr, sizeof(*addr));
  515. con->delay = 0; /* reset backoff memory */
  516. mutex_unlock(&con->mutex);
  517. queue_con(con);
  518. }
  519. EXPORT_SYMBOL(ceph_con_open);
  520. /*
  521. * return true if this connection ever successfully opened
  522. */
  523. bool ceph_con_opened(struct ceph_connection *con)
  524. {
  525. if (ceph_msgr2(from_msgr(con->msgr)))
  526. return ceph_con_v2_opened(con);
  527. return ceph_con_v1_opened(con);
  528. }
  529. /*
  530. * initialize a new connection.
  531. */
  532. void ceph_con_init(struct ceph_connection *con, void *private,
  533. const struct ceph_connection_operations *ops,
  534. struct ceph_messenger *msgr)
  535. {
  536. dout("con_init %p\n", con);
  537. memset(con, 0, sizeof(*con));
  538. con->private = private;
  539. con->ops = ops;
  540. con->msgr = msgr;
  541. con_sock_state_init(con);
  542. mutex_init(&con->mutex);
  543. INIT_LIST_HEAD(&con->out_queue);
  544. INIT_LIST_HEAD(&con->out_sent);
  545. INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
  546. con->state = CEPH_CON_S_CLOSED;
  547. }
  548. EXPORT_SYMBOL(ceph_con_init);
  549. /*
  550. * We maintain a global counter to order connection attempts. Get
  551. * a unique seq greater than @gt.
  552. */
  553. u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt)
  554. {
  555. u32 ret;
  556. spin_lock(&msgr->global_seq_lock);
  557. if (msgr->global_seq < gt)
  558. msgr->global_seq = gt;
  559. ret = ++msgr->global_seq;
  560. spin_unlock(&msgr->global_seq_lock);
  561. return ret;
  562. }
  563. /*
  564. * Discard messages that have been acked by the server.
  565. */
  566. void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
  567. {
  568. struct ceph_msg *msg;
  569. u64 seq;
  570. dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq);
  571. while (!list_empty(&con->out_sent)) {
  572. msg = list_first_entry(&con->out_sent, struct ceph_msg,
  573. list_head);
  574. WARN_ON(msg->needs_out_seq);
  575. seq = le64_to_cpu(msg->hdr.seq);
  576. if (seq > ack_seq)
  577. break;
  578. dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
  579. msg, seq);
  580. ceph_msg_remove(msg);
  581. }
  582. }
  583. /*
  584. * Discard messages that have been requeued in con_fault(), up to
  585. * reconnect_seq. This avoids gratuitously resending messages that
  586. * the server had received and handled prior to reconnect.
  587. */
  588. void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
  589. {
  590. struct ceph_msg *msg;
  591. u64 seq;
  592. dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
  593. while (!list_empty(&con->out_queue)) {
  594. msg = list_first_entry(&con->out_queue, struct ceph_msg,
  595. list_head);
  596. if (msg->needs_out_seq)
  597. break;
  598. seq = le64_to_cpu(msg->hdr.seq);
  599. if (seq > reconnect_seq)
  600. break;
  601. dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
  602. msg, seq);
  603. ceph_msg_remove(msg);
  604. }
  605. }
  606. #ifdef CONFIG_BLOCK
  607. /*
  608. * For a bio data item, a piece is whatever remains of the next
  609. * entry in the current bio iovec, or the first entry in the next
  610. * bio in the list.
  611. */
  612. static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
  613. size_t length)
  614. {
  615. struct ceph_msg_data *data = cursor->data;
  616. struct ceph_bio_iter *it = &cursor->bio_iter;
  617. cursor->resid = min_t(size_t, length, data->bio_length);
  618. *it = data->bio_pos;
  619. if (cursor->resid < it->iter.bi_size)
  620. it->iter.bi_size = cursor->resid;
  621. BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
  622. }
  623. static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
  624. size_t *page_offset,
  625. size_t *length)
  626. {
  627. struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
  628. cursor->bio_iter.iter);
  629. *page_offset = bv.bv_offset;
  630. *length = bv.bv_len;
  631. return bv.bv_page;
  632. }
  633. static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
  634. size_t bytes)
  635. {
  636. struct ceph_bio_iter *it = &cursor->bio_iter;
  637. struct page *page = bio_iter_page(it->bio, it->iter);
  638. BUG_ON(bytes > cursor->resid);
  639. BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
  640. cursor->resid -= bytes;
  641. bio_advance_iter(it->bio, &it->iter, bytes);
  642. if (!cursor->resid)
  643. return false; /* no more data */
  644. if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done &&
  645. page == bio_iter_page(it->bio, it->iter)))
  646. return false; /* more bytes to process in this segment */
  647. if (!it->iter.bi_size) {
  648. it->bio = it->bio->bi_next;
  649. it->iter = it->bio->bi_iter;
  650. if (cursor->resid < it->iter.bi_size)
  651. it->iter.bi_size = cursor->resid;
  652. }
  653. BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
  654. return true;
  655. }
  656. #endif /* CONFIG_BLOCK */
  657. static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
  658. size_t length)
  659. {
  660. struct ceph_msg_data *data = cursor->data;
  661. struct bio_vec *bvecs = data->bvec_pos.bvecs;
  662. cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
  663. cursor->bvec_iter = data->bvec_pos.iter;
  664. cursor->bvec_iter.bi_size = cursor->resid;
  665. BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
  666. }
  667. static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
  668. size_t *page_offset,
  669. size_t *length)
  670. {
  671. struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
  672. cursor->bvec_iter);
  673. *page_offset = bv.bv_offset;
  674. *length = bv.bv_len;
  675. return bv.bv_page;
  676. }
  677. static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
  678. size_t bytes)
  679. {
  680. struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
  681. struct page *page = bvec_iter_page(bvecs, cursor->bvec_iter);
  682. BUG_ON(bytes > cursor->resid);
  683. BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
  684. cursor->resid -= bytes;
  685. bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
  686. if (!cursor->resid)
  687. return false; /* no more data */
  688. if (!bytes || (cursor->bvec_iter.bi_bvec_done &&
  689. page == bvec_iter_page(bvecs, cursor->bvec_iter)))
  690. return false; /* more bytes to process in this segment */
  691. BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
  692. return true;
  693. }
  694. /*
  695. * For a page array, a piece comes from the first page in the array
  696. * that has not already been fully consumed.
  697. */
  698. static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
  699. size_t length)
  700. {
  701. struct ceph_msg_data *data = cursor->data;
  702. int page_count;
  703. BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
  704. BUG_ON(!data->pages);
  705. BUG_ON(!data->length);
  706. cursor->resid = min(length, data->length);
  707. page_count = calc_pages_for(data->alignment, (u64)data->length);
  708. cursor->page_offset = data->alignment & ~PAGE_MASK;
  709. cursor->page_index = 0;
  710. BUG_ON(page_count > (int)USHRT_MAX);
  711. cursor->page_count = (unsigned short)page_count;
  712. BUG_ON(length > SIZE_MAX - cursor->page_offset);
  713. }
  714. static struct page *
  715. ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
  716. size_t *page_offset, size_t *length)
  717. {
  718. struct ceph_msg_data *data = cursor->data;
  719. BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
  720. BUG_ON(cursor->page_index >= cursor->page_count);
  721. BUG_ON(cursor->page_offset >= PAGE_SIZE);
  722. *page_offset = cursor->page_offset;
  723. *length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset);
  724. return data->pages[cursor->page_index];
  725. }
  726. static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
  727. size_t bytes)
  728. {
  729. BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
  730. BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
  731. /* Advance the cursor page offset */
  732. cursor->resid -= bytes;
  733. cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
  734. if (!bytes || cursor->page_offset)
  735. return false; /* more bytes to process in the current page */
  736. if (!cursor->resid)
  737. return false; /* no more data */
  738. /* Move on to the next page; offset is already at 0 */
  739. BUG_ON(cursor->page_index >= cursor->page_count);
  740. cursor->page_index++;
  741. return true;
  742. }
  743. /*
  744. * For a pagelist, a piece is whatever remains to be consumed in the
  745. * first page in the list, or the front of the next page.
  746. */
  747. static void
  748. ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
  749. size_t length)
  750. {
  751. struct ceph_msg_data *data = cursor->data;
  752. struct ceph_pagelist *pagelist;
  753. struct page *page;
  754. BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
  755. pagelist = data->pagelist;
  756. BUG_ON(!pagelist);
  757. if (!length)
  758. return; /* pagelist can be assigned but empty */
  759. BUG_ON(list_empty(&pagelist->head));
  760. page = list_first_entry(&pagelist->head, struct page, lru);
  761. cursor->resid = min(length, pagelist->length);
  762. cursor->page = page;
  763. cursor->offset = 0;
  764. }
  765. static struct page *
  766. ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
  767. size_t *page_offset, size_t *length)
  768. {
  769. struct ceph_msg_data *data = cursor->data;
  770. struct ceph_pagelist *pagelist;
  771. BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
  772. pagelist = data->pagelist;
  773. BUG_ON(!pagelist);
  774. BUG_ON(!cursor->page);
  775. BUG_ON(cursor->offset + cursor->resid != pagelist->length);
  776. /* offset of first page in pagelist is always 0 */
  777. *page_offset = cursor->offset & ~PAGE_MASK;
  778. *length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset);
  779. return cursor->page;
  780. }
  781. static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
  782. size_t bytes)
  783. {
  784. struct ceph_msg_data *data = cursor->data;
  785. struct ceph_pagelist *pagelist;
  786. BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
  787. pagelist = data->pagelist;
  788. BUG_ON(!pagelist);
  789. BUG_ON(cursor->offset + cursor->resid != pagelist->length);
  790. BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
  791. /* Advance the cursor offset */
  792. cursor->resid -= bytes;
  793. cursor->offset += bytes;
  794. /* offset of first page in pagelist is always 0 */
  795. if (!bytes || cursor->offset & ~PAGE_MASK)
  796. return false; /* more bytes to process in the current page */
  797. if (!cursor->resid)
  798. return false; /* no more data */
  799. /* Move on to the next page */
  800. BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
  801. cursor->page = list_next_entry(cursor->page, lru);
  802. return true;
  803. }
  804. static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor,
  805. size_t length)
  806. {
  807. struct ceph_msg_data *data = cursor->data;
  808. cursor->iov_iter = data->iter;
  809. cursor->lastlen = 0;
  810. iov_iter_truncate(&cursor->iov_iter, length);
  811. cursor->resid = iov_iter_count(&cursor->iov_iter);
  812. }
  813. static struct page *ceph_msg_data_iter_next(struct ceph_msg_data_cursor *cursor,
  814. size_t *page_offset, size_t *length)
  815. {
  816. struct page *page;
  817. ssize_t len;
  818. if (cursor->lastlen)
  819. iov_iter_revert(&cursor->iov_iter, cursor->lastlen);
  820. len = iov_iter_get_pages2(&cursor->iov_iter, &page, PAGE_SIZE,
  821. 1, page_offset);
  822. BUG_ON(len < 0);
  823. cursor->lastlen = len;
  824. /*
  825. * FIXME: The assumption is that the pages represented by the iov_iter
  826. * are pinned, with the references held by the upper-level
  827. * callers, or by virtue of being under writeback. Eventually,
  828. * we'll get an iov_iter_get_pages2 variant that doesn't take
  829. * page refs. Until then, just put the page ref.
  830. */
  831. VM_BUG_ON_PAGE(!PageWriteback(page) && page_count(page) < 2, page);
  832. put_page(page);
  833. *length = min_t(size_t, len, cursor->resid);
  834. return page;
  835. }
  836. static bool ceph_msg_data_iter_advance(struct ceph_msg_data_cursor *cursor,
  837. size_t bytes)
  838. {
  839. BUG_ON(bytes > cursor->resid);
  840. cursor->resid -= bytes;
  841. if (bytes < cursor->lastlen) {
  842. cursor->lastlen -= bytes;
  843. } else {
  844. iov_iter_advance(&cursor->iov_iter, bytes - cursor->lastlen);
  845. cursor->lastlen = 0;
  846. }
  847. return cursor->resid;
  848. }
  849. /*
  850. * Message data is handled (sent or received) in pieces, where each
  851. * piece resides on a single page. The network layer might not
  852. * consume an entire piece at once. A data item's cursor keeps
  853. * track of which piece is next to process and how much remains to
  854. * be processed in that piece. It also tracks whether the current
  855. * piece is the last one in the data item.
  856. */
  857. static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
  858. {
  859. size_t length = cursor->total_resid;
  860. switch (cursor->data->type) {
  861. case CEPH_MSG_DATA_PAGELIST:
  862. ceph_msg_data_pagelist_cursor_init(cursor, length);
  863. break;
  864. case CEPH_MSG_DATA_PAGES:
  865. ceph_msg_data_pages_cursor_init(cursor, length);
  866. break;
  867. #ifdef CONFIG_BLOCK
  868. case CEPH_MSG_DATA_BIO:
  869. ceph_msg_data_bio_cursor_init(cursor, length);
  870. break;
  871. #endif /* CONFIG_BLOCK */
  872. case CEPH_MSG_DATA_BVECS:
  873. ceph_msg_data_bvecs_cursor_init(cursor, length);
  874. break;
  875. case CEPH_MSG_DATA_ITER:
  876. ceph_msg_data_iter_cursor_init(cursor, length);
  877. break;
  878. case CEPH_MSG_DATA_NONE:
  879. default:
  880. /* BUG(); */
  881. break;
  882. }
  883. cursor->need_crc = true;
  884. }
  885. void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
  886. struct ceph_msg *msg, size_t length)
  887. {
  888. BUG_ON(!length);
  889. BUG_ON(length > msg->data_length);
  890. BUG_ON(!msg->num_data_items);
  891. cursor->total_resid = length;
  892. cursor->data = msg->data;
  893. cursor->sr_resid = 0;
  894. __ceph_msg_data_cursor_init(cursor);
  895. }
  896. /*
  897. * Return the page containing the next piece to process for a given
  898. * data item, and supply the page offset and length of that piece.
  899. * Indicate whether this is the last piece in this data item.
  900. */
  901. struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
  902. size_t *page_offset, size_t *length)
  903. {
  904. struct page *page;
  905. switch (cursor->data->type) {
  906. case CEPH_MSG_DATA_PAGELIST:
  907. page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
  908. break;
  909. case CEPH_MSG_DATA_PAGES:
  910. page = ceph_msg_data_pages_next(cursor, page_offset, length);
  911. break;
  912. #ifdef CONFIG_BLOCK
  913. case CEPH_MSG_DATA_BIO:
  914. page = ceph_msg_data_bio_next(cursor, page_offset, length);
  915. break;
  916. #endif /* CONFIG_BLOCK */
  917. case CEPH_MSG_DATA_BVECS:
  918. page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
  919. break;
  920. case CEPH_MSG_DATA_ITER:
  921. page = ceph_msg_data_iter_next(cursor, page_offset, length);
  922. break;
  923. case CEPH_MSG_DATA_NONE:
  924. default:
  925. page = NULL;
  926. break;
  927. }
  928. BUG_ON(!page);
  929. BUG_ON(*page_offset + *length > PAGE_SIZE);
  930. BUG_ON(!*length);
  931. BUG_ON(*length > cursor->resid);
  932. return page;
  933. }
  934. /*
  935. * Returns true if the result moves the cursor on to the next piece
  936. * of the data item.
  937. */
  938. void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
  939. {
  940. bool new_piece;
  941. BUG_ON(bytes > cursor->resid);
  942. switch (cursor->data->type) {
  943. case CEPH_MSG_DATA_PAGELIST:
  944. new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
  945. break;
  946. case CEPH_MSG_DATA_PAGES:
  947. new_piece = ceph_msg_data_pages_advance(cursor, bytes);
  948. break;
  949. #ifdef CONFIG_BLOCK
  950. case CEPH_MSG_DATA_BIO:
  951. new_piece = ceph_msg_data_bio_advance(cursor, bytes);
  952. break;
  953. #endif /* CONFIG_BLOCK */
  954. case CEPH_MSG_DATA_BVECS:
  955. new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
  956. break;
  957. case CEPH_MSG_DATA_ITER:
  958. new_piece = ceph_msg_data_iter_advance(cursor, bytes);
  959. break;
  960. case CEPH_MSG_DATA_NONE:
  961. default:
  962. BUG();
  963. break;
  964. }
  965. cursor->total_resid -= bytes;
  966. if (!cursor->resid && cursor->total_resid) {
  967. cursor->data++;
  968. __ceph_msg_data_cursor_init(cursor);
  969. new_piece = true;
  970. }
  971. cursor->need_crc = new_piece;
  972. }
  973. u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
  974. unsigned int length)
  975. {
  976. char *kaddr;
  977. kaddr = kmap(page);
  978. BUG_ON(kaddr == NULL);
  979. crc = crc32c(crc, kaddr + page_offset, length);
  980. kunmap(page);
  981. return crc;
  982. }
  983. bool ceph_addr_is_blank(const struct ceph_entity_addr *addr)
  984. {
  985. struct sockaddr_storage ss = addr->in_addr; /* align */
  986. struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr;
  987. struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr;
  988. switch (ss.ss_family) {
  989. case AF_INET:
  990. return addr4->s_addr == htonl(INADDR_ANY);
  991. case AF_INET6:
  992. return ipv6_addr_any(addr6);
  993. default:
  994. return true;
  995. }
  996. }
  997. EXPORT_SYMBOL(ceph_addr_is_blank);
  998. int ceph_addr_port(const struct ceph_entity_addr *addr)
  999. {
  1000. switch (get_unaligned(&addr->in_addr.ss_family)) {
  1001. case AF_INET:
  1002. return ntohs(get_unaligned(&((struct sockaddr_in *)&addr->in_addr)->sin_port));
  1003. case AF_INET6:
  1004. return ntohs(get_unaligned(&((struct sockaddr_in6 *)&addr->in_addr)->sin6_port));
  1005. }
  1006. return 0;
  1007. }
  1008. void ceph_addr_set_port(struct ceph_entity_addr *addr, int p)
  1009. {
  1010. switch (get_unaligned(&addr->in_addr.ss_family)) {
  1011. case AF_INET:
  1012. put_unaligned(htons(p), &((struct sockaddr_in *)&addr->in_addr)->sin_port);
  1013. break;
  1014. case AF_INET6:
  1015. put_unaligned(htons(p), &((struct sockaddr_in6 *)&addr->in_addr)->sin6_port);
  1016. break;
  1017. }
  1018. }
  1019. /*
  1020. * Unlike other *_pton function semantics, zero indicates success.
  1021. */
  1022. static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr,
  1023. char delim, const char **ipend)
  1024. {
  1025. memset(&addr->in_addr, 0, sizeof(addr->in_addr));
  1026. if (in4_pton(str, len, (u8 *)&((struct sockaddr_in *)&addr->in_addr)->sin_addr.s_addr, delim, ipend)) {
  1027. put_unaligned(AF_INET, &addr->in_addr.ss_family);
  1028. return 0;
  1029. }
  1030. if (in6_pton(str, len, (u8 *)&((struct sockaddr_in6 *)&addr->in_addr)->sin6_addr.s6_addr, delim, ipend)) {
  1031. put_unaligned(AF_INET6, &addr->in_addr.ss_family);
  1032. return 0;
  1033. }
  1034. return -EINVAL;
  1035. }
  1036. /*
  1037. * Extract hostname string and resolve using kernel DNS facility.
  1038. */
  1039. #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
  1040. static int ceph_dns_resolve_name(const char *name, size_t namelen,
  1041. struct ceph_entity_addr *addr, char delim, const char **ipend)
  1042. {
  1043. const char *end, *delim_p;
  1044. char *colon_p, *ip_addr = NULL;
  1045. int ip_len, ret;
  1046. /*
  1047. * The end of the hostname occurs immediately preceding the delimiter or
  1048. * the port marker (':') where the delimiter takes precedence.
  1049. */
  1050. delim_p = memchr(name, delim, namelen);
  1051. colon_p = memchr(name, ':', namelen);
  1052. if (delim_p && colon_p)
  1053. end = min(delim_p, colon_p);
  1054. else if (!delim_p && colon_p)
  1055. end = colon_p;
  1056. else {
  1057. end = delim_p;
  1058. if (!end) /* case: hostname:/ */
  1059. end = name + namelen;
  1060. }
  1061. if (end <= name)
  1062. return -EINVAL;
  1063. /* do dns_resolve upcall */
  1064. ip_len = dns_query(current->nsproxy->net_ns,
  1065. NULL, name, end - name, NULL, &ip_addr, NULL, false);
  1066. if (ip_len > 0)
  1067. ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL);
  1068. else
  1069. ret = -ESRCH;
  1070. kfree(ip_addr);
  1071. *ipend = end;
  1072. pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
  1073. ret, ret ? "failed" : ceph_pr_addr(addr));
  1074. return ret;
  1075. }
  1076. #else
  1077. static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
  1078. struct ceph_entity_addr *addr, char delim, const char **ipend)
  1079. {
  1080. return -EINVAL;
  1081. }
  1082. #endif
  1083. /*
  1084. * Parse a server name (IP or hostname). If a valid IP address is not found
  1085. * then try to extract a hostname to resolve using userspace DNS upcall.
  1086. */
  1087. static int ceph_parse_server_name(const char *name, size_t namelen,
  1088. struct ceph_entity_addr *addr, char delim, const char **ipend)
  1089. {
  1090. int ret;
  1091. ret = ceph_pton(name, namelen, addr, delim, ipend);
  1092. if (ret)
  1093. ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend);
  1094. return ret;
  1095. }
  1096. /*
  1097. * Parse an ip[:port] list into an addr array. Use the default
  1098. * monitor port if a port isn't specified.
  1099. */
  1100. int ceph_parse_ips(const char *c, const char *end,
  1101. struct ceph_entity_addr *addr,
  1102. int max_count, int *count, char delim)
  1103. {
  1104. int i, ret = -EINVAL;
  1105. const char *p = c;
  1106. dout("parse_ips on '%.*s'\n", (int)(end-c), c);
  1107. for (i = 0; i < max_count; i++) {
  1108. char cur_delim = delim;
  1109. const char *ipend;
  1110. int port;
  1111. if (*p == '[') {
  1112. cur_delim = ']';
  1113. p++;
  1114. }
  1115. ret = ceph_parse_server_name(p, end - p, &addr[i], cur_delim,
  1116. &ipend);
  1117. if (ret)
  1118. goto bad;
  1119. ret = -EINVAL;
  1120. p = ipend;
  1121. if (cur_delim == ']') {
  1122. if (*p != ']') {
  1123. dout("missing matching ']'\n");
  1124. goto bad;
  1125. }
  1126. p++;
  1127. }
  1128. /* port? */
  1129. if (p < end && *p == ':') {
  1130. port = 0;
  1131. p++;
  1132. while (p < end && *p >= '0' && *p <= '9') {
  1133. port = (port * 10) + (*p - '0');
  1134. p++;
  1135. }
  1136. if (port == 0)
  1137. port = CEPH_MON_PORT;
  1138. else if (port > 65535)
  1139. goto bad;
  1140. } else {
  1141. port = CEPH_MON_PORT;
  1142. }
  1143. ceph_addr_set_port(&addr[i], port);
  1144. /*
  1145. * We want the type to be set according to ms_mode
  1146. * option, but options are normally parsed after mon
  1147. * addresses. Rather than complicating parsing, set
  1148. * to LEGACY and override in build_initial_monmap()
  1149. * for mon addresses and ceph_messenger_init() for
  1150. * ip option.
  1151. */
  1152. addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
  1153. addr[i].nonce = 0;
  1154. dout("%s got %s\n", __func__, ceph_pr_addr(&addr[i]));
  1155. if (p == end)
  1156. break;
  1157. if (*p != delim)
  1158. goto bad;
  1159. p++;
  1160. }
  1161. if (p != end)
  1162. goto bad;
  1163. if (count)
  1164. *count = i + 1;
  1165. return 0;
  1166. bad:
  1167. return ret;
  1168. }
  1169. /*
  1170. * Process message. This happens in the worker thread. The callback should
  1171. * be careful not to do anything that waits on other incoming messages or it
  1172. * may deadlock.
  1173. */
  1174. void ceph_con_process_message(struct ceph_connection *con)
  1175. {
  1176. struct ceph_msg *msg = con->in_msg;
  1177. BUG_ON(con->in_msg->con != con);
  1178. con->in_msg = NULL;
  1179. /* if first message, set peer_name */
  1180. if (con->peer_name.type == 0)
  1181. con->peer_name = msg->hdr.src;
  1182. con->in_seq++;
  1183. mutex_unlock(&con->mutex);
  1184. dout("===== %p %llu from %s%lld %d=%s len %d+%d+%d (%u %u %u) =====\n",
  1185. msg, le64_to_cpu(msg->hdr.seq),
  1186. ENTITY_NAME(msg->hdr.src),
  1187. le16_to_cpu(msg->hdr.type),
  1188. ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
  1189. le32_to_cpu(msg->hdr.front_len),
  1190. le32_to_cpu(msg->hdr.middle_len),
  1191. le32_to_cpu(msg->hdr.data_len),
  1192. con->in_front_crc, con->in_middle_crc, con->in_data_crc);
  1193. con->ops->dispatch(con, msg);
  1194. mutex_lock(&con->mutex);
  1195. }
  1196. /*
  1197. * Atomically queue work on a connection after the specified delay.
  1198. * Bump @con reference to avoid races with connection teardown.
  1199. * Returns 0 if work was queued, or an error code otherwise.
  1200. */
  1201. static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
  1202. {
  1203. if (!con->ops->get(con)) {
  1204. dout("%s %p ref count 0\n", __func__, con);
  1205. return -ENOENT;
  1206. }
  1207. if (delay >= HZ)
  1208. delay = round_jiffies_relative(delay);
  1209. dout("%s %p %lu\n", __func__, con, delay);
  1210. if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
  1211. dout("%s %p - already queued\n", __func__, con);
  1212. con->ops->put(con);
  1213. return -EBUSY;
  1214. }
  1215. return 0;
  1216. }
  1217. static void queue_con(struct ceph_connection *con)
  1218. {
  1219. (void) queue_con_delay(con, 0);
  1220. }
  1221. static void cancel_con(struct ceph_connection *con)
  1222. {
  1223. if (cancel_delayed_work(&con->work)) {
  1224. dout("%s %p\n", __func__, con);
  1225. con->ops->put(con);
  1226. }
  1227. }
  1228. static bool con_sock_closed(struct ceph_connection *con)
  1229. {
  1230. if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED))
  1231. return false;
  1232. #define CASE(x) \
  1233. case CEPH_CON_S_ ## x: \
  1234. con->error_msg = "socket closed (con state " #x ")"; \
  1235. break;
  1236. switch (con->state) {
  1237. CASE(CLOSED);
  1238. CASE(PREOPEN);
  1239. CASE(V1_BANNER);
  1240. CASE(V1_CONNECT_MSG);
  1241. CASE(V2_BANNER_PREFIX);
  1242. CASE(V2_BANNER_PAYLOAD);
  1243. CASE(V2_HELLO);
  1244. CASE(V2_AUTH);
  1245. CASE(V2_AUTH_SIGNATURE);
  1246. CASE(V2_SESSION_CONNECT);
  1247. CASE(V2_SESSION_RECONNECT);
  1248. CASE(OPEN);
  1249. CASE(STANDBY);
  1250. default:
  1251. BUG();
  1252. }
  1253. #undef CASE
  1254. return true;
  1255. }
  1256. static bool con_backoff(struct ceph_connection *con)
  1257. {
  1258. int ret;
  1259. if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF))
  1260. return false;
  1261. ret = queue_con_delay(con, con->delay);
  1262. if (ret) {
  1263. dout("%s: con %p FAILED to back off %lu\n", __func__,
  1264. con, con->delay);
  1265. BUG_ON(ret == -ENOENT);
  1266. ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
  1267. }
  1268. return true;
  1269. }
  1270. /* Finish fault handling; con->mutex must *not* be held here */
  1271. static void con_fault_finish(struct ceph_connection *con)
  1272. {
  1273. dout("%s %p\n", __func__, con);
  1274. /*
  1275. * in case we faulted due to authentication, invalidate our
  1276. * current tickets so that we can get new ones.
  1277. */
  1278. if (!ceph_msgr2(from_msgr(con->msgr)) && con->v1.auth_retry) {
  1279. dout("auth_retry %d, invalidating\n", con->v1.auth_retry);
  1280. if (con->ops->invalidate_authorizer)
  1281. con->ops->invalidate_authorizer(con);
  1282. con->v1.auth_retry = 0;
  1283. }
  1284. if (con->ops->fault)
  1285. con->ops->fault(con);
  1286. }
  1287. /*
  1288. * Do some work on a connection. Drop a connection ref when we're done.
  1289. */
  1290. static void ceph_con_workfn(struct work_struct *work)
  1291. {
  1292. struct ceph_connection *con = container_of(work, struct ceph_connection,
  1293. work.work);
  1294. bool fault;
  1295. mutex_lock(&con->mutex);
  1296. while (true) {
  1297. int ret;
  1298. if ((fault = con_sock_closed(con))) {
  1299. dout("%s: con %p SOCK_CLOSED\n", __func__, con);
  1300. break;
  1301. }
  1302. if (con_backoff(con)) {
  1303. dout("%s: con %p BACKOFF\n", __func__, con);
  1304. break;
  1305. }
  1306. if (con->state == CEPH_CON_S_STANDBY) {
  1307. dout("%s: con %p STANDBY\n", __func__, con);
  1308. break;
  1309. }
  1310. if (con->state == CEPH_CON_S_CLOSED) {
  1311. dout("%s: con %p CLOSED\n", __func__, con);
  1312. BUG_ON(con->sock);
  1313. break;
  1314. }
  1315. if (con->state == CEPH_CON_S_PREOPEN) {
  1316. dout("%s: con %p PREOPEN\n", __func__, con);
  1317. BUG_ON(con->sock);
  1318. }
  1319. if (ceph_msgr2(from_msgr(con->msgr)))
  1320. ret = ceph_con_v2_try_read(con);
  1321. else
  1322. ret = ceph_con_v1_try_read(con);
  1323. if (ret < 0) {
  1324. if (ret == -EAGAIN)
  1325. continue;
  1326. if (!con->error_msg)
  1327. con->error_msg = "socket error on read";
  1328. fault = true;
  1329. break;
  1330. }
  1331. if (ceph_msgr2(from_msgr(con->msgr)))
  1332. ret = ceph_con_v2_try_write(con);
  1333. else
  1334. ret = ceph_con_v1_try_write(con);
  1335. if (ret < 0) {
  1336. if (ret == -EAGAIN)
  1337. continue;
  1338. if (!con->error_msg)
  1339. con->error_msg = "socket error on write";
  1340. fault = true;
  1341. }
  1342. break; /* If we make it to here, we're done */
  1343. }
  1344. if (fault)
  1345. con_fault(con);
  1346. mutex_unlock(&con->mutex);
  1347. if (fault)
  1348. con_fault_finish(con);
  1349. con->ops->put(con);
  1350. }
  1351. /*
  1352. * Generic error/fault handler. A retry mechanism is used with
  1353. * exponential backoff
  1354. */
  1355. static void con_fault(struct ceph_connection *con)
  1356. {
  1357. dout("fault %p state %d to peer %s\n",
  1358. con, con->state, ceph_pr_addr(&con->peer_addr));
  1359. pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
  1360. ceph_pr_addr(&con->peer_addr), con->error_msg);
  1361. con->error_msg = NULL;
  1362. WARN_ON(con->state == CEPH_CON_S_STANDBY ||
  1363. con->state == CEPH_CON_S_CLOSED);
  1364. ceph_con_reset_protocol(con);
  1365. if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
  1366. dout("fault on LOSSYTX channel, marking CLOSED\n");
  1367. con->state = CEPH_CON_S_CLOSED;
  1368. return;
  1369. }
  1370. /* Requeue anything that hasn't been acked */
  1371. list_splice_init(&con->out_sent, &con->out_queue);
  1372. /* If there are no messages queued or keepalive pending, place
  1373. * the connection in a STANDBY state */
  1374. if (list_empty(&con->out_queue) &&
  1375. !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
  1376. dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
  1377. ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
  1378. con->state = CEPH_CON_S_STANDBY;
  1379. } else {
  1380. /* retry after a delay. */
  1381. con->state = CEPH_CON_S_PREOPEN;
  1382. if (!con->delay) {
  1383. con->delay = BASE_DELAY_INTERVAL;
  1384. } else if (con->delay < MAX_DELAY_INTERVAL) {
  1385. con->delay *= 2;
  1386. if (con->delay > MAX_DELAY_INTERVAL)
  1387. con->delay = MAX_DELAY_INTERVAL;
  1388. }
  1389. ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
  1390. queue_con(con);
  1391. }
  1392. }
  1393. void ceph_messenger_reset_nonce(struct ceph_messenger *msgr)
  1394. {
  1395. u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000;
  1396. msgr->inst.addr.nonce = cpu_to_le32(nonce);
  1397. ceph_encode_my_addr(msgr);
  1398. }
  1399. /*
  1400. * initialize a new messenger instance
  1401. */
  1402. void ceph_messenger_init(struct ceph_messenger *msgr,
  1403. struct ceph_entity_addr *myaddr)
  1404. {
  1405. spin_lock_init(&msgr->global_seq_lock);
  1406. if (myaddr) {
  1407. memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr,
  1408. sizeof(msgr->inst.addr.in_addr));
  1409. ceph_addr_set_port(&msgr->inst.addr, 0);
  1410. }
  1411. /*
  1412. * Since nautilus, clients are identified using type ANY.
  1413. * For msgr1, ceph_encode_banner_addr() munges it to NONE.
  1414. */
  1415. msgr->inst.addr.type = CEPH_ENTITY_ADDR_TYPE_ANY;
  1416. /* generate a random non-zero nonce */
  1417. do {
  1418. get_random_bytes(&msgr->inst.addr.nonce,
  1419. sizeof(msgr->inst.addr.nonce));
  1420. } while (!msgr->inst.addr.nonce);
  1421. ceph_encode_my_addr(msgr);
  1422. atomic_set(&msgr->stopping, 0);
  1423. write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
  1424. dout("%s %p\n", __func__, msgr);
  1425. }
  1426. void ceph_messenger_fini(struct ceph_messenger *msgr)
  1427. {
  1428. put_net(read_pnet(&msgr->net));
  1429. }
  1430. static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
  1431. {
  1432. if (msg->con)
  1433. msg->con->ops->put(msg->con);
  1434. msg->con = con ? con->ops->get(con) : NULL;
  1435. BUG_ON(msg->con != con);
  1436. }
  1437. static void clear_standby(struct ceph_connection *con)
  1438. {
  1439. /* come back from STANDBY? */
  1440. if (con->state == CEPH_CON_S_STANDBY) {
  1441. dout("clear_standby %p\n", con);
  1442. con->state = CEPH_CON_S_PREOPEN;
  1443. if (!ceph_msgr2(from_msgr(con->msgr)))
  1444. con->v1.connect_seq++;
  1445. WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
  1446. WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
  1447. }
  1448. }
  1449. /*
  1450. * Queue up an outgoing message on the given connection.
  1451. *
  1452. * Consumes a ref on @msg.
  1453. */
  1454. void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
  1455. {
  1456. /* set src+dst */
  1457. msg->hdr.src = con->msgr->inst.name;
  1458. BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
  1459. msg->needs_out_seq = true;
  1460. mutex_lock(&con->mutex);
  1461. if (con->state == CEPH_CON_S_CLOSED) {
  1462. dout("con_send %p closed, dropping %p\n", con, msg);
  1463. ceph_msg_put(msg);
  1464. mutex_unlock(&con->mutex);
  1465. return;
  1466. }
  1467. msg_con_set(msg, con);
  1468. BUG_ON(!list_empty(&msg->list_head));
  1469. list_add_tail(&msg->list_head, &con->out_queue);
  1470. dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
  1471. ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
  1472. ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
  1473. le32_to_cpu(msg->hdr.front_len),
  1474. le32_to_cpu(msg->hdr.middle_len),
  1475. le32_to_cpu(msg->hdr.data_len));
  1476. clear_standby(con);
  1477. mutex_unlock(&con->mutex);
  1478. /* if there wasn't anything waiting to send before, queue
  1479. * new work */
  1480. if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
  1481. queue_con(con);
  1482. }
  1483. EXPORT_SYMBOL(ceph_con_send);
  1484. /*
  1485. * Revoke a message that was previously queued for send
  1486. */
  1487. void ceph_msg_revoke(struct ceph_msg *msg)
  1488. {
  1489. struct ceph_connection *con = msg->con;
  1490. if (!con) {
  1491. dout("%s msg %p null con\n", __func__, msg);
  1492. return; /* Message not in our possession */
  1493. }
  1494. mutex_lock(&con->mutex);
  1495. if (list_empty(&msg->list_head)) {
  1496. WARN_ON(con->out_msg == msg);
  1497. dout("%s con %p msg %p not linked\n", __func__, con, msg);
  1498. mutex_unlock(&con->mutex);
  1499. return;
  1500. }
  1501. dout("%s con %p msg %p was linked\n", __func__, con, msg);
  1502. msg->hdr.seq = 0;
  1503. ceph_msg_remove(msg);
  1504. if (con->out_msg == msg) {
  1505. WARN_ON(con->state != CEPH_CON_S_OPEN);
  1506. dout("%s con %p msg %p was sending\n", __func__, con, msg);
  1507. if (ceph_msgr2(from_msgr(con->msgr)))
  1508. ceph_con_v2_revoke(con, msg);
  1509. else
  1510. ceph_con_v1_revoke(con, msg);
  1511. ceph_msg_put(con->out_msg);
  1512. con->out_msg = NULL;
  1513. } else {
  1514. dout("%s con %p msg %p not current, out_msg %p\n", __func__,
  1515. con, msg, con->out_msg);
  1516. }
  1517. mutex_unlock(&con->mutex);
  1518. }
  1519. /*
  1520. * Revoke a message that we may be reading data into
  1521. */
  1522. void ceph_msg_revoke_incoming(struct ceph_msg *msg)
  1523. {
  1524. struct ceph_connection *con = msg->con;
  1525. if (!con) {
  1526. dout("%s msg %p null con\n", __func__, msg);
  1527. return; /* Message not in our possession */
  1528. }
  1529. mutex_lock(&con->mutex);
  1530. if (con->in_msg == msg) {
  1531. WARN_ON(con->state != CEPH_CON_S_OPEN);
  1532. dout("%s con %p msg %p was recving\n", __func__, con, msg);
  1533. if (ceph_msgr2(from_msgr(con->msgr)))
  1534. ceph_con_v2_revoke_incoming(con);
  1535. else
  1536. ceph_con_v1_revoke_incoming(con);
  1537. ceph_msg_put(con->in_msg);
  1538. con->in_msg = NULL;
  1539. } else {
  1540. dout("%s con %p msg %p not current, in_msg %p\n", __func__,
  1541. con, msg, con->in_msg);
  1542. }
  1543. mutex_unlock(&con->mutex);
  1544. }
  1545. /*
  1546. * Queue a keepalive byte to ensure the tcp connection is alive.
  1547. */
  1548. void ceph_con_keepalive(struct ceph_connection *con)
  1549. {
  1550. dout("con_keepalive %p\n", con);
  1551. mutex_lock(&con->mutex);
  1552. clear_standby(con);
  1553. ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
  1554. mutex_unlock(&con->mutex);
  1555. if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
  1556. queue_con(con);
  1557. }
  1558. EXPORT_SYMBOL(ceph_con_keepalive);
  1559. bool ceph_con_keepalive_expired(struct ceph_connection *con,
  1560. unsigned long interval)
  1561. {
  1562. if (interval > 0 &&
  1563. (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
  1564. struct timespec64 now;
  1565. struct timespec64 ts;
  1566. ktime_get_real_ts64(&now);
  1567. jiffies_to_timespec64(interval, &ts);
  1568. ts = timespec64_add(con->last_keepalive_ack, ts);
  1569. return timespec64_compare(&now, &ts) >= 0;
  1570. }
  1571. return false;
  1572. }
  1573. static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
  1574. {
  1575. BUG_ON(msg->num_data_items >= msg->max_data_items);
  1576. return &msg->data[msg->num_data_items++];
  1577. }
  1578. static void ceph_msg_data_destroy(struct ceph_msg_data *data)
  1579. {
  1580. if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
  1581. int num_pages = calc_pages_for(data->alignment, data->length);
  1582. ceph_release_page_vector(data->pages, num_pages);
  1583. } else if (data->type == CEPH_MSG_DATA_PAGELIST) {
  1584. ceph_pagelist_release(data->pagelist);
  1585. }
  1586. }
  1587. void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
  1588. size_t length, size_t alignment, bool own_pages)
  1589. {
  1590. struct ceph_msg_data *data;
  1591. BUG_ON(!pages);
  1592. BUG_ON(!length);
  1593. data = ceph_msg_data_add(msg);
  1594. data->type = CEPH_MSG_DATA_PAGES;
  1595. data->pages = pages;
  1596. data->length = length;
  1597. data->alignment = alignment & ~PAGE_MASK;
  1598. data->own_pages = own_pages;
  1599. msg->data_length += length;
  1600. }
  1601. EXPORT_SYMBOL(ceph_msg_data_add_pages);
  1602. void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
  1603. struct ceph_pagelist *pagelist)
  1604. {
  1605. struct ceph_msg_data *data;
  1606. BUG_ON(!pagelist);
  1607. BUG_ON(!pagelist->length);
  1608. data = ceph_msg_data_add(msg);
  1609. data->type = CEPH_MSG_DATA_PAGELIST;
  1610. refcount_inc(&pagelist->refcnt);
  1611. data->pagelist = pagelist;
  1612. msg->data_length += pagelist->length;
  1613. }
  1614. EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
  1615. #ifdef CONFIG_BLOCK
  1616. void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
  1617. u32 length)
  1618. {
  1619. struct ceph_msg_data *data;
  1620. data = ceph_msg_data_add(msg);
  1621. data->type = CEPH_MSG_DATA_BIO;
  1622. data->bio_pos = *bio_pos;
  1623. data->bio_length = length;
  1624. msg->data_length += length;
  1625. }
  1626. EXPORT_SYMBOL(ceph_msg_data_add_bio);
  1627. #endif /* CONFIG_BLOCK */
  1628. void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
  1629. struct ceph_bvec_iter *bvec_pos)
  1630. {
  1631. struct ceph_msg_data *data;
  1632. data = ceph_msg_data_add(msg);
  1633. data->type = CEPH_MSG_DATA_BVECS;
  1634. data->bvec_pos = *bvec_pos;
  1635. msg->data_length += bvec_pos->iter.bi_size;
  1636. }
  1637. EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
  1638. void ceph_msg_data_add_iter(struct ceph_msg *msg,
  1639. struct iov_iter *iter)
  1640. {
  1641. struct ceph_msg_data *data;
  1642. data = ceph_msg_data_add(msg);
  1643. data->type = CEPH_MSG_DATA_ITER;
  1644. data->iter = *iter;
  1645. msg->data_length += iov_iter_count(&data->iter);
  1646. }
  1647. /*
  1648. * construct a new message with given type, size
  1649. * the new msg has a ref count of 1.
  1650. */
  1651. struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
  1652. gfp_t flags, bool can_fail)
  1653. {
  1654. struct ceph_msg *m;
  1655. m = kmem_cache_zalloc(ceph_msg_cache, flags);
  1656. if (m == NULL)
  1657. goto out;
  1658. m->hdr.type = cpu_to_le16(type);
  1659. m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
  1660. m->hdr.front_len = cpu_to_le32(front_len);
  1661. INIT_LIST_HEAD(&m->list_head);
  1662. kref_init(&m->kref);
  1663. /* front */
  1664. if (front_len) {
  1665. m->front.iov_base = kvmalloc(front_len, flags);
  1666. if (m->front.iov_base == NULL) {
  1667. dout("ceph_msg_new can't allocate %d bytes\n",
  1668. front_len);
  1669. goto out2;
  1670. }
  1671. } else {
  1672. m->front.iov_base = NULL;
  1673. }
  1674. m->front_alloc_len = m->front.iov_len = front_len;
  1675. if (max_data_items) {
  1676. m->data = kmalloc_objs(*m->data, max_data_items, flags);
  1677. if (!m->data)
  1678. goto out2;
  1679. m->max_data_items = max_data_items;
  1680. }
  1681. dout("ceph_msg_new %p front %d\n", m, front_len);
  1682. return m;
  1683. out2:
  1684. ceph_msg_put(m);
  1685. out:
  1686. if (!can_fail) {
  1687. pr_err("msg_new can't create type %d front %d\n", type,
  1688. front_len);
  1689. WARN_ON(1);
  1690. } else {
  1691. dout("msg_new can't create type %d front %d\n", type,
  1692. front_len);
  1693. }
  1694. return NULL;
  1695. }
  1696. EXPORT_SYMBOL(ceph_msg_new2);
  1697. struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
  1698. bool can_fail)
  1699. {
  1700. return ceph_msg_new2(type, front_len, 0, flags, can_fail);
  1701. }
  1702. EXPORT_SYMBOL(ceph_msg_new);
  1703. /*
  1704. * Allocate "middle" portion of a message, if it is needed and wasn't
  1705. * allocated by alloc_msg. This allows us to read a small fixed-size
  1706. * per-type header in the front and then gracefully fail (i.e.,
  1707. * propagate the error to the caller based on info in the front) when
  1708. * the middle is too large.
  1709. */
  1710. static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
  1711. {
  1712. int type = le16_to_cpu(msg->hdr.type);
  1713. int middle_len = le32_to_cpu(msg->hdr.middle_len);
  1714. dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
  1715. ceph_msg_type_name(type), middle_len);
  1716. BUG_ON(!middle_len);
  1717. BUG_ON(msg->middle);
  1718. msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
  1719. if (!msg->middle)
  1720. return -ENOMEM;
  1721. return 0;
  1722. }
  1723. /*
  1724. * Allocate a message for receiving an incoming message on a
  1725. * connection, and save the result in con->in_msg. Uses the
  1726. * connection's private alloc_msg op if available.
  1727. *
  1728. * Returns 0 on success, or a negative error code.
  1729. *
  1730. * On success, if we set *skip = 1:
  1731. * - the next message should be skipped and ignored.
  1732. * - con->in_msg == NULL
  1733. * or if we set *skip = 0:
  1734. * - con->in_msg is non-null.
  1735. * On error (ENOMEM, EAGAIN, ...),
  1736. * - con->in_msg == NULL
  1737. */
  1738. int ceph_con_in_msg_alloc(struct ceph_connection *con,
  1739. struct ceph_msg_header *hdr, int *skip)
  1740. {
  1741. int middle_len = le32_to_cpu(hdr->middle_len);
  1742. struct ceph_msg *msg;
  1743. int ret = 0;
  1744. BUG_ON(con->in_msg != NULL);
  1745. BUG_ON(!con->ops->alloc_msg);
  1746. mutex_unlock(&con->mutex);
  1747. msg = con->ops->alloc_msg(con, hdr, skip);
  1748. mutex_lock(&con->mutex);
  1749. if (con->state != CEPH_CON_S_OPEN) {
  1750. if (msg)
  1751. ceph_msg_put(msg);
  1752. return -EAGAIN;
  1753. }
  1754. if (msg) {
  1755. BUG_ON(*skip);
  1756. msg_con_set(msg, con);
  1757. con->in_msg = msg;
  1758. } else {
  1759. /*
  1760. * Null message pointer means either we should skip
  1761. * this message or we couldn't allocate memory. The
  1762. * former is not an error.
  1763. */
  1764. if (*skip)
  1765. return 0;
  1766. con->error_msg = "error allocating memory for incoming message";
  1767. return -ENOMEM;
  1768. }
  1769. memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr));
  1770. if (middle_len && !con->in_msg->middle) {
  1771. ret = ceph_alloc_middle(con, con->in_msg);
  1772. if (ret < 0) {
  1773. ceph_msg_put(con->in_msg);
  1774. con->in_msg = NULL;
  1775. }
  1776. }
  1777. return ret;
  1778. }
  1779. struct ceph_msg *ceph_con_get_out_msg(struct ceph_connection *con)
  1780. {
  1781. struct ceph_msg *msg;
  1782. if (list_empty(&con->out_queue))
  1783. return NULL;
  1784. msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
  1785. WARN_ON(msg->con != con);
  1786. /*
  1787. * Put the message on "sent" list using a ref from ceph_con_send().
  1788. * It is put when the message is acked or revoked.
  1789. */
  1790. list_move_tail(&msg->list_head, &con->out_sent);
  1791. /*
  1792. * Only assign outgoing seq # if we haven't sent this message
  1793. * yet. If it is requeued, resend with it's original seq.
  1794. */
  1795. if (msg->needs_out_seq) {
  1796. msg->hdr.seq = cpu_to_le64(++con->out_seq);
  1797. msg->needs_out_seq = false;
  1798. if (con->ops->reencode_message)
  1799. con->ops->reencode_message(msg);
  1800. }
  1801. /*
  1802. * Get a ref for out_msg. It is put when we are done sending the
  1803. * message or in case of a fault.
  1804. */
  1805. WARN_ON(con->out_msg);
  1806. return con->out_msg = ceph_msg_get(msg);
  1807. }
  1808. /*
  1809. * Free a generically kmalloc'd message.
  1810. */
  1811. static void ceph_msg_free(struct ceph_msg *m)
  1812. {
  1813. dout("%s %p\n", __func__, m);
  1814. kvfree(m->front.iov_base);
  1815. kfree(m->data);
  1816. kmem_cache_free(ceph_msg_cache, m);
  1817. }
  1818. static void ceph_msg_release(struct kref *kref)
  1819. {
  1820. struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
  1821. int i;
  1822. dout("%s %p\n", __func__, m);
  1823. WARN_ON(!list_empty(&m->list_head));
  1824. msg_con_set(m, NULL);
  1825. /* drop middle, data, if any */
  1826. if (m->middle) {
  1827. ceph_buffer_put(m->middle);
  1828. m->middle = NULL;
  1829. }
  1830. for (i = 0; i < m->num_data_items; i++)
  1831. ceph_msg_data_destroy(&m->data[i]);
  1832. if (m->pool)
  1833. ceph_msgpool_put(m->pool, m);
  1834. else
  1835. ceph_msg_free(m);
  1836. }
  1837. struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
  1838. {
  1839. dout("%s %p (was %d)\n", __func__, msg,
  1840. kref_read(&msg->kref));
  1841. kref_get(&msg->kref);
  1842. return msg;
  1843. }
  1844. EXPORT_SYMBOL(ceph_msg_get);
  1845. void ceph_msg_put(struct ceph_msg *msg)
  1846. {
  1847. dout("%s %p (was %d)\n", __func__, msg,
  1848. kref_read(&msg->kref));
  1849. kref_put(&msg->kref, ceph_msg_release);
  1850. }
  1851. EXPORT_SYMBOL(ceph_msg_put);
  1852. void ceph_msg_dump(struct ceph_msg *msg)
  1853. {
  1854. pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
  1855. msg->front_alloc_len, msg->data_length);
  1856. print_hex_dump(KERN_DEBUG, "header: ",
  1857. DUMP_PREFIX_OFFSET, 16, 1,
  1858. &msg->hdr, sizeof(msg->hdr), true);
  1859. print_hex_dump(KERN_DEBUG, " front: ",
  1860. DUMP_PREFIX_OFFSET, 16, 1,
  1861. msg->front.iov_base, msg->front.iov_len, true);
  1862. if (msg->middle)
  1863. print_hex_dump(KERN_DEBUG, "middle: ",
  1864. DUMP_PREFIX_OFFSET, 16, 1,
  1865. msg->middle->vec.iov_base,
  1866. msg->middle->vec.iov_len, true);
  1867. print_hex_dump(KERN_DEBUG, "footer: ",
  1868. DUMP_PREFIX_OFFSET, 16, 1,
  1869. &msg->footer, sizeof(msg->footer), true);
  1870. }
  1871. EXPORT_SYMBOL(ceph_msg_dump);