messenger_v1.c 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620
  1. // SPDX-License-Identifier: GPL-2.0
  2. #include <linux/ceph/ceph_debug.h>
  3. #include <linux/bvec.h>
  4. #include <linux/crc32c.h>
  5. #include <linux/net.h>
  6. #include <linux/socket.h>
  7. #include <net/sock.h>
  8. #include <linux/ceph/ceph_features.h>
  9. #include <linux/ceph/decode.h>
  10. #include <linux/ceph/libceph.h>
  11. #include <linux/ceph/messenger.h>
  12. /* static tag bytes (protocol control messages) */
  13. static char tag_msg = CEPH_MSGR_TAG_MSG;
  14. static char tag_ack = CEPH_MSGR_TAG_ACK;
  15. static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
  16. static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
  17. /*
  18. * If @buf is NULL, discard up to @len bytes.
  19. */
  20. static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
  21. {
  22. struct kvec iov = {buf, len};
  23. struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  24. int r;
  25. if (!buf)
  26. msg.msg_flags |= MSG_TRUNC;
  27. iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, len);
  28. r = sock_recvmsg(sock, &msg, msg.msg_flags);
  29. if (r == -EAGAIN)
  30. r = 0;
  31. return r;
  32. }
  33. static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
  34. int page_offset, size_t length)
  35. {
  36. struct bio_vec bvec;
  37. struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  38. int r;
  39. BUG_ON(page_offset + length > PAGE_SIZE);
  40. bvec_set_page(&bvec, page, length, page_offset);
  41. iov_iter_bvec(&msg.msg_iter, ITER_DEST, &bvec, 1, length);
  42. r = sock_recvmsg(sock, &msg, msg.msg_flags);
  43. if (r == -EAGAIN)
  44. r = 0;
  45. return r;
  46. }
  47. /*
  48. * write something. @more is true if caller will be sending more data
  49. * shortly.
  50. */
  51. static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
  52. size_t kvlen, size_t len, bool more)
  53. {
  54. struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  55. int r;
  56. if (more)
  57. msg.msg_flags |= MSG_MORE;
  58. else
  59. msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
  60. r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
  61. if (r == -EAGAIN)
  62. r = 0;
  63. return r;
  64. }
  65. /*
  66. * @more: MSG_MORE or 0.
  67. */
  68. static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
  69. int offset, size_t size, int more)
  70. {
  71. struct msghdr msg = {
  72. .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | more,
  73. };
  74. struct bio_vec bvec;
  75. int ret;
  76. /*
  77. * MSG_SPLICE_PAGES cannot properly handle pages with page_count == 0,
  78. * we need to fall back to sendmsg if that's the case.
  79. *
  80. * Same goes for slab pages: skb_can_coalesce() allows
  81. * coalescing neighboring slab objects into a single frag which
  82. * triggers one of hardened usercopy checks.
  83. */
  84. if (sendpage_ok(page))
  85. msg.msg_flags |= MSG_SPLICE_PAGES;
  86. bvec_set_page(&bvec, page, size, offset);
  87. iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
  88. ret = sock_sendmsg(sock, &msg);
  89. if (ret == -EAGAIN)
  90. ret = 0;
  91. return ret;
  92. }
  93. static void con_out_kvec_reset(struct ceph_connection *con)
  94. {
  95. BUG_ON(con->v1.out_skip);
  96. con->v1.out_kvec_left = 0;
  97. con->v1.out_kvec_bytes = 0;
  98. con->v1.out_kvec_cur = &con->v1.out_kvec[0];
  99. }
  100. static void con_out_kvec_add(struct ceph_connection *con,
  101. size_t size, void *data)
  102. {
  103. int index = con->v1.out_kvec_left;
  104. BUG_ON(con->v1.out_skip);
  105. BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
  106. con->v1.out_kvec[index].iov_len = size;
  107. con->v1.out_kvec[index].iov_base = data;
  108. con->v1.out_kvec_left++;
  109. con->v1.out_kvec_bytes += size;
  110. }
  111. /*
  112. * Chop off a kvec from the end. Return residual number of bytes for
  113. * that kvec, i.e. how many bytes would have been written if the kvec
  114. * hadn't been nuked.
  115. */
  116. static int con_out_kvec_skip(struct ceph_connection *con)
  117. {
  118. int skip = 0;
  119. if (con->v1.out_kvec_bytes > 0) {
  120. skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
  121. BUG_ON(con->v1.out_kvec_bytes < skip);
  122. BUG_ON(!con->v1.out_kvec_left);
  123. con->v1.out_kvec_bytes -= skip;
  124. con->v1.out_kvec_left--;
  125. }
  126. return skip;
  127. }
  128. static size_t sizeof_footer(struct ceph_connection *con)
  129. {
  130. return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
  131. sizeof(struct ceph_msg_footer) :
  132. sizeof(struct ceph_msg_footer_old);
  133. }
  134. static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
  135. {
  136. /* Initialize data cursor if it's not a sparse read */
  137. u64 len = msg->sparse_read_total ? : data_len;
  138. ceph_msg_data_cursor_init(&msg->cursor, msg, len);
  139. }
  140. /*
  141. * Prepare footer for currently outgoing message, and finish things
  142. * off. Assumes out_kvec* are already valid.. we just add on to the end.
  143. */
  144. static void prepare_write_message_footer(struct ceph_connection *con,
  145. struct ceph_msg *m)
  146. {
  147. m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
  148. dout("prepare_write_message_footer %p\n", con);
  149. con_out_kvec_add(con, sizeof_footer(con), &m->footer);
  150. if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
  151. if (con->ops->sign_message)
  152. con->ops->sign_message(m);
  153. else
  154. m->footer.sig = 0;
  155. } else {
  156. m->old_footer.flags = m->footer.flags;
  157. }
  158. con->v1.out_more = m->more_to_follow;
  159. con->v1.out_msg_done = true;
  160. }
  161. /*
  162. * Prepare headers for the next outgoing message.
  163. */
  164. static void prepare_write_message(struct ceph_connection *con,
  165. struct ceph_msg *m)
  166. {
  167. u32 crc;
  168. con_out_kvec_reset(con);
  169. con->v1.out_msg_done = false;
  170. /* Sneak an ack in there first? If we can get it into the same
  171. * TCP packet that's a good thing. */
  172. if (con->in_seq > con->in_seq_acked) {
  173. con->in_seq_acked = con->in_seq;
  174. con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
  175. con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
  176. con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
  177. &con->v1.out_temp_ack);
  178. }
  179. dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
  180. m, con->out_seq, le16_to_cpu(m->hdr.type),
  181. le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
  182. m->data_length);
  183. WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
  184. WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
  185. /* tag + hdr + front + middle */
  186. con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
  187. con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
  188. con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
  189. if (m->middle)
  190. con_out_kvec_add(con, m->middle->vec.iov_len,
  191. m->middle->vec.iov_base);
  192. /* fill in hdr crc and finalize hdr */
  193. crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
  194. m->hdr.crc = cpu_to_le32(crc);
  195. memcpy(&con->v1.out_hdr, &m->hdr, sizeof(con->v1.out_hdr));
  196. /* fill in front and middle crc, footer */
  197. crc = crc32c(0, m->front.iov_base, m->front.iov_len);
  198. m->footer.front_crc = cpu_to_le32(crc);
  199. if (m->middle) {
  200. crc = crc32c(0, m->middle->vec.iov_base,
  201. m->middle->vec.iov_len);
  202. m->footer.middle_crc = cpu_to_le32(crc);
  203. } else
  204. m->footer.middle_crc = 0;
  205. dout("%s front_crc %u middle_crc %u\n", __func__,
  206. le32_to_cpu(m->footer.front_crc),
  207. le32_to_cpu(m->footer.middle_crc));
  208. m->footer.flags = 0;
  209. /* is there a data payload? */
  210. m->footer.data_crc = 0;
  211. if (m->data_length) {
  212. prepare_message_data(m, m->data_length);
  213. con->v1.out_more = 1; /* data + footer will follow */
  214. } else {
  215. /* no, queue up footer too and be done */
  216. prepare_write_message_footer(con, m);
  217. }
  218. ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
  219. }
  220. /*
  221. * Prepare an ack.
  222. */
  223. static void prepare_write_ack(struct ceph_connection *con)
  224. {
  225. dout("prepare_write_ack %p %llu -> %llu\n", con,
  226. con->in_seq_acked, con->in_seq);
  227. con->in_seq_acked = con->in_seq;
  228. con_out_kvec_reset(con);
  229. con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
  230. con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
  231. con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
  232. &con->v1.out_temp_ack);
  233. con->v1.out_more = 1; /* more will follow.. eventually.. */
  234. ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
  235. }
  236. /*
  237. * Prepare to share the seq during handshake
  238. */
  239. static void prepare_write_seq(struct ceph_connection *con)
  240. {
  241. dout("prepare_write_seq %p %llu -> %llu\n", con,
  242. con->in_seq_acked, con->in_seq);
  243. con->in_seq_acked = con->in_seq;
  244. con_out_kvec_reset(con);
  245. con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
  246. con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
  247. &con->v1.out_temp_ack);
  248. ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
  249. }
  250. /*
  251. * Prepare to write keepalive byte.
  252. */
  253. static void prepare_write_keepalive(struct ceph_connection *con)
  254. {
  255. dout("prepare_write_keepalive %p\n", con);
  256. con_out_kvec_reset(con);
  257. if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
  258. struct timespec64 now;
  259. ktime_get_real_ts64(&now);
  260. con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
  261. ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
  262. con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
  263. &con->v1.out_temp_keepalive2);
  264. } else {
  265. con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
  266. }
  267. ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
  268. }
  269. /*
  270. * Connection negotiation.
  271. */
  272. static int get_connect_authorizer(struct ceph_connection *con)
  273. {
  274. struct ceph_auth_handshake *auth;
  275. int auth_proto;
  276. if (!con->ops->get_authorizer) {
  277. con->v1.auth = NULL;
  278. con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
  279. con->v1.out_connect.authorizer_len = 0;
  280. return 0;
  281. }
  282. auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
  283. if (IS_ERR(auth))
  284. return PTR_ERR(auth);
  285. con->v1.auth = auth;
  286. con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
  287. con->v1.out_connect.authorizer_len =
  288. cpu_to_le32(auth->authorizer_buf_len);
  289. return 0;
  290. }
  291. /*
  292. * We connected to a peer and are saying hello.
  293. */
  294. static void prepare_write_banner(struct ceph_connection *con)
  295. {
  296. con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
  297. con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
  298. &con->msgr->my_enc_addr);
  299. con->v1.out_more = 0;
  300. ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
  301. }
  302. static void __prepare_write_connect(struct ceph_connection *con)
  303. {
  304. con_out_kvec_add(con, sizeof(con->v1.out_connect),
  305. &con->v1.out_connect);
  306. if (con->v1.auth)
  307. con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
  308. con->v1.auth->authorizer_buf);
  309. con->v1.out_more = 0;
  310. ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
  311. }
  312. static int prepare_write_connect(struct ceph_connection *con)
  313. {
  314. unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
  315. int proto;
  316. int ret;
  317. switch (con->peer_name.type) {
  318. case CEPH_ENTITY_TYPE_MON:
  319. proto = CEPH_MONC_PROTOCOL;
  320. break;
  321. case CEPH_ENTITY_TYPE_OSD:
  322. proto = CEPH_OSDC_PROTOCOL;
  323. break;
  324. case CEPH_ENTITY_TYPE_MDS:
  325. proto = CEPH_MDSC_PROTOCOL;
  326. break;
  327. default:
  328. BUG();
  329. }
  330. dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
  331. con->v1.connect_seq, global_seq, proto);
  332. con->v1.out_connect.features =
  333. cpu_to_le64(from_msgr(con->msgr)->supported_features);
  334. con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
  335. con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
  336. con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
  337. con->v1.out_connect.protocol_version = cpu_to_le32(proto);
  338. con->v1.out_connect.flags = 0;
  339. ret = get_connect_authorizer(con);
  340. if (ret)
  341. return ret;
  342. __prepare_write_connect(con);
  343. return 0;
  344. }
  345. /*
  346. * write as much of pending kvecs to the socket as we can.
  347. * 1 -> done
  348. * 0 -> socket full, but more to do
  349. * <0 -> error
  350. */
  351. static int write_partial_kvec(struct ceph_connection *con)
  352. {
  353. int ret;
  354. dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
  355. while (con->v1.out_kvec_bytes > 0) {
  356. ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
  357. con->v1.out_kvec_left,
  358. con->v1.out_kvec_bytes,
  359. con->v1.out_more);
  360. if (ret <= 0)
  361. goto out;
  362. con->v1.out_kvec_bytes -= ret;
  363. if (!con->v1.out_kvec_bytes)
  364. break; /* done */
  365. /* account for full iov entries consumed */
  366. while (ret >= con->v1.out_kvec_cur->iov_len) {
  367. BUG_ON(!con->v1.out_kvec_left);
  368. ret -= con->v1.out_kvec_cur->iov_len;
  369. con->v1.out_kvec_cur++;
  370. con->v1.out_kvec_left--;
  371. }
  372. /* and for a partially-consumed entry */
  373. if (ret) {
  374. con->v1.out_kvec_cur->iov_len -= ret;
  375. con->v1.out_kvec_cur->iov_base += ret;
  376. }
  377. }
  378. con->v1.out_kvec_left = 0;
  379. ret = 1;
  380. out:
  381. dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
  382. con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
  383. return ret; /* done! */
  384. }
  385. /*
  386. * Write as much message data payload as we can. If we finish, queue
  387. * up the footer.
  388. * 1 -> done, footer is now queued in out_kvec[].
  389. * 0 -> socket full, but more to do
  390. * <0 -> error
  391. */
  392. static int write_partial_message_data(struct ceph_connection *con,
  393. struct ceph_msg *msg)
  394. {
  395. struct ceph_msg_data_cursor *cursor = &msg->cursor;
  396. bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
  397. u32 crc;
  398. dout("%s %p msg %p\n", __func__, con, msg);
  399. if (!msg->num_data_items)
  400. return -EINVAL;
  401. /*
  402. * Iterate through each page that contains data to be
  403. * written, and send as much as possible for each.
  404. *
  405. * If we are calculating the data crc (the default), we will
  406. * need to map the page. If we have no pages, they have
  407. * been revoked, so use the zero page.
  408. */
  409. crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
  410. while (cursor->total_resid) {
  411. struct page *page;
  412. size_t page_offset;
  413. size_t length;
  414. int ret;
  415. if (!cursor->resid) {
  416. ceph_msg_data_advance(cursor, 0);
  417. continue;
  418. }
  419. page = ceph_msg_data_next(cursor, &page_offset, &length);
  420. ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
  421. MSG_MORE);
  422. if (ret <= 0) {
  423. if (do_datacrc)
  424. msg->footer.data_crc = cpu_to_le32(crc);
  425. return ret;
  426. }
  427. if (do_datacrc && cursor->need_crc)
  428. crc = ceph_crc32c_page(crc, page, page_offset, length);
  429. ceph_msg_data_advance(cursor, (size_t)ret);
  430. }
  431. dout("%s %p msg %p done\n", __func__, con, msg);
  432. /* prepare and queue up footer, too */
  433. if (do_datacrc)
  434. msg->footer.data_crc = cpu_to_le32(crc);
  435. else
  436. msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
  437. con_out_kvec_reset(con);
  438. prepare_write_message_footer(con, msg);
  439. return 1; /* must return > 0 to indicate success */
  440. }
  441. /*
  442. * write some zeros
  443. */
  444. static int write_partial_skip(struct ceph_connection *con)
  445. {
  446. int ret;
  447. dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
  448. while (con->v1.out_skip > 0) {
  449. size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
  450. ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
  451. MSG_MORE);
  452. if (ret <= 0)
  453. goto out;
  454. con->v1.out_skip -= ret;
  455. }
  456. ret = 1;
  457. out:
  458. return ret;
  459. }
  460. /*
  461. * Prepare to read connection handshake, or an ack.
  462. */
  463. static void prepare_read_banner(struct ceph_connection *con)
  464. {
  465. dout("prepare_read_banner %p\n", con);
  466. con->v1.in_base_pos = 0;
  467. }
  468. static void prepare_read_connect(struct ceph_connection *con)
  469. {
  470. dout("prepare_read_connect %p\n", con);
  471. con->v1.in_base_pos = 0;
  472. }
  473. static void prepare_read_ack(struct ceph_connection *con)
  474. {
  475. dout("prepare_read_ack %p\n", con);
  476. con->v1.in_base_pos = 0;
  477. }
  478. static void prepare_read_seq(struct ceph_connection *con)
  479. {
  480. dout("prepare_read_seq %p\n", con);
  481. con->v1.in_base_pos = 0;
  482. con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
  483. }
  484. static void prepare_read_tag(struct ceph_connection *con)
  485. {
  486. dout("prepare_read_tag %p\n", con);
  487. con->v1.in_base_pos = 0;
  488. con->v1.in_tag = CEPH_MSGR_TAG_READY;
  489. }
  490. static void prepare_read_keepalive_ack(struct ceph_connection *con)
  491. {
  492. dout("prepare_read_keepalive_ack %p\n", con);
  493. con->v1.in_base_pos = 0;
  494. }
  495. /*
  496. * Prepare to read a message.
  497. */
  498. static int prepare_read_message(struct ceph_connection *con)
  499. {
  500. dout("prepare_read_message %p\n", con);
  501. BUG_ON(con->in_msg != NULL);
  502. con->v1.in_base_pos = 0;
  503. con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
  504. return 0;
  505. }
  506. static int read_partial(struct ceph_connection *con,
  507. int end, int size, void *object)
  508. {
  509. while (con->v1.in_base_pos < end) {
  510. int left = end - con->v1.in_base_pos;
  511. int have = size - left;
  512. int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
  513. if (ret <= 0)
  514. return ret;
  515. con->v1.in_base_pos += ret;
  516. }
  517. return 1;
  518. }
  519. /*
  520. * Read all or part of the connect-side handshake on a new connection
  521. */
  522. static int read_partial_banner(struct ceph_connection *con)
  523. {
  524. int size;
  525. int end;
  526. int ret;
  527. dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
  528. /* peer's banner */
  529. size = strlen(CEPH_BANNER);
  530. end = size;
  531. ret = read_partial(con, end, size, con->v1.in_banner);
  532. if (ret <= 0)
  533. goto out;
  534. size = sizeof(con->v1.actual_peer_addr);
  535. end += size;
  536. ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
  537. if (ret <= 0)
  538. goto out;
  539. ceph_decode_banner_addr(&con->v1.actual_peer_addr);
  540. size = sizeof(con->v1.peer_addr_for_me);
  541. end += size;
  542. ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
  543. if (ret <= 0)
  544. goto out;
  545. ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
  546. out:
  547. return ret;
  548. }
  549. static int read_partial_connect(struct ceph_connection *con)
  550. {
  551. int size;
  552. int end;
  553. int ret;
  554. dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
  555. size = sizeof(con->v1.in_reply);
  556. end = size;
  557. ret = read_partial(con, end, size, &con->v1.in_reply);
  558. if (ret <= 0)
  559. goto out;
  560. if (con->v1.auth) {
  561. size = le32_to_cpu(con->v1.in_reply.authorizer_len);
  562. if (size > con->v1.auth->authorizer_reply_buf_len) {
  563. pr_err("authorizer reply too big: %d > %zu\n", size,
  564. con->v1.auth->authorizer_reply_buf_len);
  565. ret = -EINVAL;
  566. goto out;
  567. }
  568. end += size;
  569. ret = read_partial(con, end, size,
  570. con->v1.auth->authorizer_reply_buf);
  571. if (ret <= 0)
  572. goto out;
  573. }
  574. dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
  575. con, con->v1.in_reply.tag,
  576. le32_to_cpu(con->v1.in_reply.connect_seq),
  577. le32_to_cpu(con->v1.in_reply.global_seq));
  578. out:
  579. return ret;
  580. }
  581. /*
  582. * Verify the hello banner looks okay.
  583. */
  584. static int verify_hello(struct ceph_connection *con)
  585. {
  586. if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
  587. pr_err("connect to %s got bad banner\n",
  588. ceph_pr_addr(&con->peer_addr));
  589. con->error_msg = "protocol error, bad banner";
  590. return -1;
  591. }
  592. return 0;
  593. }
  594. static int process_banner(struct ceph_connection *con)
  595. {
  596. struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
  597. dout("process_banner on %p\n", con);
  598. if (verify_hello(con) < 0)
  599. return -1;
  600. /*
  601. * Make sure the other end is who we wanted. note that the other
  602. * end may not yet know their ip address, so if it's 0.0.0.0, give
  603. * them the benefit of the doubt.
  604. */
  605. if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
  606. sizeof(con->peer_addr)) != 0 &&
  607. !(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
  608. con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
  609. pr_warn("wrong peer, want %s/%u, got %s/%u\n",
  610. ceph_pr_addr(&con->peer_addr),
  611. le32_to_cpu(con->peer_addr.nonce),
  612. ceph_pr_addr(&con->v1.actual_peer_addr),
  613. le32_to_cpu(con->v1.actual_peer_addr.nonce));
  614. con->error_msg = "wrong peer at address";
  615. return -1;
  616. }
  617. /*
  618. * did we learn our address?
  619. */
  620. if (ceph_addr_is_blank(my_addr)) {
  621. memcpy(&my_addr->in_addr,
  622. &con->v1.peer_addr_for_me.in_addr,
  623. sizeof(con->v1.peer_addr_for_me.in_addr));
  624. ceph_addr_set_port(my_addr, 0);
  625. ceph_encode_my_addr(con->msgr);
  626. dout("process_banner learned my addr is %s\n",
  627. ceph_pr_addr(my_addr));
  628. }
  629. return 0;
  630. }
  631. static int process_connect(struct ceph_connection *con)
  632. {
  633. u64 sup_feat = from_msgr(con->msgr)->supported_features;
  634. u64 req_feat = from_msgr(con->msgr)->required_features;
  635. u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
  636. int ret;
  637. dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
  638. if (con->v1.auth) {
  639. int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
  640. /*
  641. * Any connection that defines ->get_authorizer()
  642. * should also define ->add_authorizer_challenge() and
  643. * ->verify_authorizer_reply().
  644. *
  645. * See get_connect_authorizer().
  646. */
  647. if (con->v1.in_reply.tag ==
  648. CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
  649. ret = con->ops->add_authorizer_challenge(
  650. con, con->v1.auth->authorizer_reply_buf, len);
  651. if (ret < 0)
  652. return ret;
  653. con_out_kvec_reset(con);
  654. __prepare_write_connect(con);
  655. prepare_read_connect(con);
  656. return 0;
  657. }
  658. if (len) {
  659. ret = con->ops->verify_authorizer_reply(con);
  660. if (ret < 0) {
  661. con->error_msg = "bad authorize reply";
  662. return ret;
  663. }
  664. }
  665. }
  666. switch (con->v1.in_reply.tag) {
  667. case CEPH_MSGR_TAG_FEATURES:
  668. pr_err("%s%lld %s feature set mismatch,"
  669. " my %llx < server's %llx, missing %llx\n",
  670. ENTITY_NAME(con->peer_name),
  671. ceph_pr_addr(&con->peer_addr),
  672. sup_feat, server_feat, server_feat & ~sup_feat);
  673. con->error_msg = "missing required protocol features";
  674. return -1;
  675. case CEPH_MSGR_TAG_BADPROTOVER:
  676. pr_err("%s%lld %s protocol version mismatch,"
  677. " my %d != server's %d\n",
  678. ENTITY_NAME(con->peer_name),
  679. ceph_pr_addr(&con->peer_addr),
  680. le32_to_cpu(con->v1.out_connect.protocol_version),
  681. le32_to_cpu(con->v1.in_reply.protocol_version));
  682. con->error_msg = "protocol version mismatch";
  683. return -1;
  684. case CEPH_MSGR_TAG_BADAUTHORIZER:
  685. con->v1.auth_retry++;
  686. dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
  687. con->v1.auth_retry);
  688. if (con->v1.auth_retry == 2) {
  689. con->error_msg = "connect authorization failure";
  690. return -1;
  691. }
  692. con_out_kvec_reset(con);
  693. ret = prepare_write_connect(con);
  694. if (ret < 0)
  695. return ret;
  696. prepare_read_connect(con);
  697. break;
  698. case CEPH_MSGR_TAG_RESETSESSION:
  699. /*
  700. * If we connected with a large connect_seq but the peer
  701. * has no record of a session with us (no connection, or
  702. * connect_seq == 0), they will send RESETSESION to indicate
  703. * that they must have reset their session, and may have
  704. * dropped messages.
  705. */
  706. dout("process_connect got RESET peer seq %u\n",
  707. le32_to_cpu(con->v1.in_reply.connect_seq));
  708. pr_info("%s%lld %s session reset\n",
  709. ENTITY_NAME(con->peer_name),
  710. ceph_pr_addr(&con->peer_addr));
  711. ceph_con_reset_session(con);
  712. con_out_kvec_reset(con);
  713. ret = prepare_write_connect(con);
  714. if (ret < 0)
  715. return ret;
  716. prepare_read_connect(con);
  717. /* Tell ceph about it. */
  718. mutex_unlock(&con->mutex);
  719. if (con->ops->peer_reset)
  720. con->ops->peer_reset(con);
  721. mutex_lock(&con->mutex);
  722. if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
  723. return -EAGAIN;
  724. break;
  725. case CEPH_MSGR_TAG_RETRY_SESSION:
  726. /*
  727. * If we sent a smaller connect_seq than the peer has, try
  728. * again with a larger value.
  729. */
  730. dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
  731. le32_to_cpu(con->v1.out_connect.connect_seq),
  732. le32_to_cpu(con->v1.in_reply.connect_seq));
  733. con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
  734. con_out_kvec_reset(con);
  735. ret = prepare_write_connect(con);
  736. if (ret < 0)
  737. return ret;
  738. prepare_read_connect(con);
  739. break;
  740. case CEPH_MSGR_TAG_RETRY_GLOBAL:
  741. /*
  742. * If we sent a smaller global_seq than the peer has, try
  743. * again with a larger value.
  744. */
  745. dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
  746. con->v1.peer_global_seq,
  747. le32_to_cpu(con->v1.in_reply.global_seq));
  748. ceph_get_global_seq(con->msgr,
  749. le32_to_cpu(con->v1.in_reply.global_seq));
  750. con_out_kvec_reset(con);
  751. ret = prepare_write_connect(con);
  752. if (ret < 0)
  753. return ret;
  754. prepare_read_connect(con);
  755. break;
  756. case CEPH_MSGR_TAG_SEQ:
  757. case CEPH_MSGR_TAG_READY:
  758. if (req_feat & ~server_feat) {
  759. pr_err("%s%lld %s protocol feature mismatch,"
  760. " my required %llx > server's %llx, need %llx\n",
  761. ENTITY_NAME(con->peer_name),
  762. ceph_pr_addr(&con->peer_addr),
  763. req_feat, server_feat, req_feat & ~server_feat);
  764. con->error_msg = "missing required protocol features";
  765. return -1;
  766. }
  767. WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
  768. con->state = CEPH_CON_S_OPEN;
  769. con->v1.auth_retry = 0; /* we authenticated; clear flag */
  770. con->v1.peer_global_seq =
  771. le32_to_cpu(con->v1.in_reply.global_seq);
  772. con->v1.connect_seq++;
  773. con->peer_features = server_feat;
  774. dout("process_connect got READY gseq %d cseq %d (%d)\n",
  775. con->v1.peer_global_seq,
  776. le32_to_cpu(con->v1.in_reply.connect_seq),
  777. con->v1.connect_seq);
  778. WARN_ON(con->v1.connect_seq !=
  779. le32_to_cpu(con->v1.in_reply.connect_seq));
  780. if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
  781. ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
  782. con->delay = 0; /* reset backoff memory */
  783. if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
  784. prepare_write_seq(con);
  785. prepare_read_seq(con);
  786. } else {
  787. prepare_read_tag(con);
  788. }
  789. break;
  790. case CEPH_MSGR_TAG_WAIT:
  791. /*
  792. * If there is a connection race (we are opening
  793. * connections to each other), one of us may just have
  794. * to WAIT. This shouldn't happen if we are the
  795. * client.
  796. */
  797. con->error_msg = "protocol error, got WAIT as client";
  798. return -1;
  799. default:
  800. con->error_msg = "protocol error, garbage tag during connect";
  801. return -1;
  802. }
  803. return 0;
  804. }
  805. /*
  806. * read (part of) an ack
  807. */
  808. static int read_partial_ack(struct ceph_connection *con)
  809. {
  810. int size = sizeof(con->v1.in_temp_ack);
  811. int end = size;
  812. return read_partial(con, end, size, &con->v1.in_temp_ack);
  813. }
  814. /*
  815. * We can finally discard anything that's been acked.
  816. */
  817. static void process_ack(struct ceph_connection *con)
  818. {
  819. u64 ack = le64_to_cpu(con->v1.in_temp_ack);
  820. if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
  821. ceph_con_discard_sent(con, ack);
  822. else
  823. ceph_con_discard_requeued(con, ack);
  824. prepare_read_tag(con);
  825. }
  826. static int read_partial_message_chunk(struct ceph_connection *con,
  827. struct kvec *section,
  828. unsigned int sec_len, u32 *crc)
  829. {
  830. int ret, left;
  831. BUG_ON(!section);
  832. while (section->iov_len < sec_len) {
  833. BUG_ON(section->iov_base == NULL);
  834. left = sec_len - section->iov_len;
  835. ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
  836. section->iov_len, left);
  837. if (ret <= 0)
  838. return ret;
  839. section->iov_len += ret;
  840. }
  841. if (section->iov_len == sec_len)
  842. *crc = crc32c(*crc, section->iov_base, section->iov_len);
  843. return 1;
  844. }
  845. static inline int read_partial_message_section(struct ceph_connection *con,
  846. struct kvec *section,
  847. unsigned int sec_len, u32 *crc)
  848. {
  849. *crc = 0;
  850. return read_partial_message_chunk(con, section, sec_len, crc);
  851. }
  852. static int read_partial_sparse_msg_extent(struct ceph_connection *con, u32 *crc)
  853. {
  854. struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
  855. bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE);
  856. if (do_bounce && unlikely(!con->bounce_page)) {
  857. con->bounce_page = alloc_page(GFP_NOIO);
  858. if (!con->bounce_page) {
  859. pr_err("failed to allocate bounce page\n");
  860. return -ENOMEM;
  861. }
  862. }
  863. while (cursor->sr_resid > 0) {
  864. struct page *page, *rpage;
  865. size_t off, len;
  866. int ret;
  867. page = ceph_msg_data_next(cursor, &off, &len);
  868. rpage = do_bounce ? con->bounce_page : page;
  869. /* clamp to what remains in extent */
  870. len = min_t(int, len, cursor->sr_resid);
  871. ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len);
  872. if (ret <= 0)
  873. return ret;
  874. *crc = ceph_crc32c_page(*crc, rpage, off, ret);
  875. ceph_msg_data_advance(cursor, (size_t)ret);
  876. cursor->sr_resid -= ret;
  877. if (do_bounce)
  878. memcpy_page(page, off, rpage, off, ret);
  879. }
  880. return 1;
  881. }
  882. static int read_partial_sparse_msg_data(struct ceph_connection *con)
  883. {
  884. struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
  885. bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
  886. u32 crc = 0;
  887. int ret = 1;
  888. if (do_datacrc)
  889. crc = con->in_data_crc;
  890. while (cursor->total_resid) {
  891. if (con->v1.in_sr_kvec.iov_base)
  892. ret = read_partial_message_chunk(con,
  893. &con->v1.in_sr_kvec,
  894. con->v1.in_sr_len,
  895. &crc);
  896. else if (cursor->sr_resid > 0)
  897. ret = read_partial_sparse_msg_extent(con, &crc);
  898. if (ret <= 0)
  899. break;
  900. memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
  901. ret = con->ops->sparse_read(con, cursor,
  902. (char **)&con->v1.in_sr_kvec.iov_base);
  903. if (ret <= 0) {
  904. ret = ret ? ret : 1; /* must return > 0 to indicate success */
  905. break;
  906. }
  907. con->v1.in_sr_len = ret;
  908. }
  909. if (do_datacrc)
  910. con->in_data_crc = crc;
  911. return ret;
  912. }
  913. static int read_partial_msg_data(struct ceph_connection *con)
  914. {
  915. struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
  916. bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
  917. struct page *page;
  918. size_t page_offset;
  919. size_t length;
  920. u32 crc = 0;
  921. int ret;
  922. if (do_datacrc)
  923. crc = con->in_data_crc;
  924. while (cursor->total_resid) {
  925. if (!cursor->resid) {
  926. ceph_msg_data_advance(cursor, 0);
  927. continue;
  928. }
  929. page = ceph_msg_data_next(cursor, &page_offset, &length);
  930. ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
  931. if (ret <= 0) {
  932. if (do_datacrc)
  933. con->in_data_crc = crc;
  934. return ret;
  935. }
  936. if (do_datacrc)
  937. crc = ceph_crc32c_page(crc, page, page_offset, ret);
  938. ceph_msg_data_advance(cursor, (size_t)ret);
  939. }
  940. if (do_datacrc)
  941. con->in_data_crc = crc;
  942. return 1; /* must return > 0 to indicate success */
  943. }
  944. static int read_partial_msg_data_bounce(struct ceph_connection *con)
  945. {
  946. struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
  947. struct page *page;
  948. size_t off, len;
  949. u32 crc;
  950. int ret;
  951. if (unlikely(!con->bounce_page)) {
  952. con->bounce_page = alloc_page(GFP_NOIO);
  953. if (!con->bounce_page) {
  954. pr_err("failed to allocate bounce page\n");
  955. return -ENOMEM;
  956. }
  957. }
  958. crc = con->in_data_crc;
  959. while (cursor->total_resid) {
  960. if (!cursor->resid) {
  961. ceph_msg_data_advance(cursor, 0);
  962. continue;
  963. }
  964. page = ceph_msg_data_next(cursor, &off, &len);
  965. ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
  966. if (ret <= 0) {
  967. con->in_data_crc = crc;
  968. return ret;
  969. }
  970. crc = crc32c(crc, page_address(con->bounce_page), ret);
  971. memcpy_to_page(page, off, page_address(con->bounce_page), ret);
  972. ceph_msg_data_advance(cursor, ret);
  973. }
  974. con->in_data_crc = crc;
  975. return 1; /* must return > 0 to indicate success */
  976. }
  977. /*
  978. * read (part of) a message.
  979. */
  980. static int read_partial_message(struct ceph_connection *con)
  981. {
  982. struct ceph_msg *m = con->in_msg;
  983. int size;
  984. int end;
  985. int ret;
  986. unsigned int front_len, middle_len, data_len;
  987. bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
  988. bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
  989. u64 seq;
  990. u32 crc;
  991. dout("read_partial_message con %p msg %p\n", con, m);
  992. /* header */
  993. size = sizeof(con->v1.in_hdr);
  994. end = size;
  995. ret = read_partial(con, end, size, &con->v1.in_hdr);
  996. if (ret <= 0)
  997. return ret;
  998. crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
  999. if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
  1000. pr_err("read_partial_message bad hdr crc %u != expected %u\n",
  1001. crc, con->v1.in_hdr.crc);
  1002. return -EBADMSG;
  1003. }
  1004. front_len = le32_to_cpu(con->v1.in_hdr.front_len);
  1005. if (front_len > CEPH_MSG_MAX_FRONT_LEN)
  1006. return -EIO;
  1007. middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
  1008. if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
  1009. return -EIO;
  1010. data_len = le32_to_cpu(con->v1.in_hdr.data_len);
  1011. if (data_len > CEPH_MSG_MAX_DATA_LEN)
  1012. return -EIO;
  1013. /* verify seq# */
  1014. seq = le64_to_cpu(con->v1.in_hdr.seq);
  1015. if ((s64)seq - (s64)con->in_seq < 1) {
  1016. pr_info("skipping %s%lld %s seq %lld expected %lld\n",
  1017. ENTITY_NAME(con->peer_name),
  1018. ceph_pr_addr(&con->peer_addr),
  1019. seq, con->in_seq + 1);
  1020. con->v1.in_base_pos = -front_len - middle_len - data_len -
  1021. sizeof_footer(con);
  1022. con->v1.in_tag = CEPH_MSGR_TAG_READY;
  1023. return 1;
  1024. } else if ((s64)seq - (s64)con->in_seq > 1) {
  1025. pr_err("read_partial_message bad seq %lld expected %lld\n",
  1026. seq, con->in_seq + 1);
  1027. con->error_msg = "bad message sequence # for incoming message";
  1028. return -EBADE;
  1029. }
  1030. /* allocate message? */
  1031. if (!con->in_msg) {
  1032. int skip = 0;
  1033. dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
  1034. front_len, data_len);
  1035. ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
  1036. if (ret < 0)
  1037. return ret;
  1038. BUG_ON((!con->in_msg) ^ skip);
  1039. if (skip) {
  1040. /* skip this message */
  1041. dout("alloc_msg said skip message\n");
  1042. con->v1.in_base_pos = -front_len - middle_len -
  1043. data_len - sizeof_footer(con);
  1044. con->v1.in_tag = CEPH_MSGR_TAG_READY;
  1045. con->in_seq++;
  1046. return 1;
  1047. }
  1048. BUG_ON(!con->in_msg);
  1049. BUG_ON(con->in_msg->con != con);
  1050. m = con->in_msg;
  1051. m->front.iov_len = 0; /* haven't read it yet */
  1052. if (m->middle)
  1053. m->middle->vec.iov_len = 0;
  1054. /* prepare for data payload, if any */
  1055. if (data_len)
  1056. prepare_message_data(con->in_msg, data_len);
  1057. }
  1058. /* front */
  1059. ret = read_partial_message_section(con, &m->front, front_len,
  1060. &con->in_front_crc);
  1061. if (ret <= 0)
  1062. return ret;
  1063. /* middle */
  1064. if (m->middle) {
  1065. ret = read_partial_message_section(con, &m->middle->vec,
  1066. middle_len,
  1067. &con->in_middle_crc);
  1068. if (ret <= 0)
  1069. return ret;
  1070. }
  1071. /* (page) data */
  1072. if (data_len) {
  1073. if (!m->num_data_items)
  1074. return -EIO;
  1075. if (m->sparse_read_total)
  1076. ret = read_partial_sparse_msg_data(con);
  1077. else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
  1078. ret = read_partial_msg_data_bounce(con);
  1079. else
  1080. ret = read_partial_msg_data(con);
  1081. if (ret <= 0)
  1082. return ret;
  1083. }
  1084. /* footer */
  1085. size = sizeof_footer(con);
  1086. end += size;
  1087. ret = read_partial(con, end, size, &m->footer);
  1088. if (ret <= 0)
  1089. return ret;
  1090. if (!need_sign) {
  1091. m->footer.flags = m->old_footer.flags;
  1092. m->footer.sig = 0;
  1093. }
  1094. dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
  1095. m, front_len, m->footer.front_crc, middle_len,
  1096. m->footer.middle_crc, data_len, m->footer.data_crc);
  1097. /* crc ok? */
  1098. if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
  1099. pr_err("read_partial_message %p front crc %u != exp. %u\n",
  1100. m, con->in_front_crc, m->footer.front_crc);
  1101. return -EBADMSG;
  1102. }
  1103. if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
  1104. pr_err("read_partial_message %p middle crc %u != exp %u\n",
  1105. m, con->in_middle_crc, m->footer.middle_crc);
  1106. return -EBADMSG;
  1107. }
  1108. if (do_datacrc &&
  1109. (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
  1110. con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
  1111. pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
  1112. con->in_data_crc, le32_to_cpu(m->footer.data_crc));
  1113. return -EBADMSG;
  1114. }
  1115. if (need_sign && con->ops->check_message_signature &&
  1116. con->ops->check_message_signature(m)) {
  1117. pr_err("read_partial_message %p signature check failed\n", m);
  1118. return -EBADMSG;
  1119. }
  1120. return 1; /* done! */
  1121. }
  1122. static int read_keepalive_ack(struct ceph_connection *con)
  1123. {
  1124. struct ceph_timespec ceph_ts;
  1125. size_t size = sizeof(ceph_ts);
  1126. int ret = read_partial(con, size, size, &ceph_ts);
  1127. if (ret <= 0)
  1128. return ret;
  1129. ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
  1130. prepare_read_tag(con);
  1131. return 1;
  1132. }
  1133. /*
  1134. * Read what we can from the socket.
  1135. */
  1136. int ceph_con_v1_try_read(struct ceph_connection *con)
  1137. {
  1138. int ret = -1;
  1139. more:
  1140. dout("try_read start %p state %d\n", con, con->state);
  1141. if (con->state != CEPH_CON_S_V1_BANNER &&
  1142. con->state != CEPH_CON_S_V1_CONNECT_MSG &&
  1143. con->state != CEPH_CON_S_OPEN)
  1144. return 0;
  1145. BUG_ON(!con->sock);
  1146. dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
  1147. con->v1.in_base_pos);
  1148. if (con->state == CEPH_CON_S_V1_BANNER) {
  1149. ret = read_partial_banner(con);
  1150. if (ret <= 0)
  1151. goto out;
  1152. ret = process_banner(con);
  1153. if (ret < 0)
  1154. goto out;
  1155. con->state = CEPH_CON_S_V1_CONNECT_MSG;
  1156. /*
  1157. * Received banner is good, exchange connection info.
  1158. * Do not reset out_kvec, as sending our banner raced
  1159. * with receiving peer banner after connect completed.
  1160. */
  1161. ret = prepare_write_connect(con);
  1162. if (ret < 0)
  1163. goto out;
  1164. prepare_read_connect(con);
  1165. /* Send connection info before awaiting response */
  1166. goto out;
  1167. }
  1168. if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
  1169. ret = read_partial_connect(con);
  1170. if (ret <= 0)
  1171. goto out;
  1172. ret = process_connect(con);
  1173. if (ret < 0)
  1174. goto out;
  1175. goto more;
  1176. }
  1177. WARN_ON(con->state != CEPH_CON_S_OPEN);
  1178. if (con->v1.in_base_pos < 0) {
  1179. /*
  1180. * skipping + discarding content.
  1181. */
  1182. ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
  1183. if (ret <= 0)
  1184. goto out;
  1185. dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
  1186. con->v1.in_base_pos += ret;
  1187. if (con->v1.in_base_pos)
  1188. goto more;
  1189. }
  1190. if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
  1191. /*
  1192. * what's next?
  1193. */
  1194. ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
  1195. if (ret <= 0)
  1196. goto out;
  1197. dout("try_read got tag %d\n", con->v1.in_tag);
  1198. switch (con->v1.in_tag) {
  1199. case CEPH_MSGR_TAG_MSG:
  1200. prepare_read_message(con);
  1201. break;
  1202. case CEPH_MSGR_TAG_ACK:
  1203. prepare_read_ack(con);
  1204. break;
  1205. case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
  1206. prepare_read_keepalive_ack(con);
  1207. break;
  1208. case CEPH_MSGR_TAG_CLOSE:
  1209. ceph_con_close_socket(con);
  1210. con->state = CEPH_CON_S_CLOSED;
  1211. goto out;
  1212. default:
  1213. goto bad_tag;
  1214. }
  1215. }
  1216. if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
  1217. ret = read_partial_message(con);
  1218. if (ret <= 0) {
  1219. switch (ret) {
  1220. case -EBADMSG:
  1221. con->error_msg = "bad crc/signature";
  1222. fallthrough;
  1223. case -EBADE:
  1224. ret = -EIO;
  1225. break;
  1226. case -EIO:
  1227. con->error_msg = "io error";
  1228. break;
  1229. }
  1230. goto out;
  1231. }
  1232. if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
  1233. goto more;
  1234. ceph_con_process_message(con);
  1235. if (con->state == CEPH_CON_S_OPEN)
  1236. prepare_read_tag(con);
  1237. goto more;
  1238. }
  1239. if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
  1240. con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
  1241. /*
  1242. * the final handshake seq exchange is semantically
  1243. * equivalent to an ACK
  1244. */
  1245. ret = read_partial_ack(con);
  1246. if (ret <= 0)
  1247. goto out;
  1248. process_ack(con);
  1249. goto more;
  1250. }
  1251. if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
  1252. ret = read_keepalive_ack(con);
  1253. if (ret <= 0)
  1254. goto out;
  1255. goto more;
  1256. }
  1257. out:
  1258. dout("try_read done on %p ret %d\n", con, ret);
  1259. return ret;
  1260. bad_tag:
  1261. pr_err("try_read bad tag %d\n", con->v1.in_tag);
  1262. con->error_msg = "protocol error, garbage tag";
  1263. ret = -1;
  1264. goto out;
  1265. }
  1266. /*
  1267. * Write something to the socket. Called in a worker thread when the
  1268. * socket appears to be writeable and we have something ready to send.
  1269. */
  1270. int ceph_con_v1_try_write(struct ceph_connection *con)
  1271. {
  1272. struct ceph_msg *msg;
  1273. int ret = 1;
  1274. dout("try_write start %p state %d\n", con, con->state);
  1275. if (con->state != CEPH_CON_S_PREOPEN &&
  1276. con->state != CEPH_CON_S_V1_BANNER &&
  1277. con->state != CEPH_CON_S_V1_CONNECT_MSG &&
  1278. con->state != CEPH_CON_S_OPEN)
  1279. return 0;
  1280. /* open the socket first? */
  1281. if (con->state == CEPH_CON_S_PREOPEN) {
  1282. BUG_ON(con->sock);
  1283. con->state = CEPH_CON_S_V1_BANNER;
  1284. con_out_kvec_reset(con);
  1285. prepare_write_banner(con);
  1286. prepare_read_banner(con);
  1287. BUG_ON(con->in_msg);
  1288. con->v1.in_tag = CEPH_MSGR_TAG_READY;
  1289. dout("try_write initiating connect on %p new state %d\n",
  1290. con, con->state);
  1291. ret = ceph_tcp_connect(con);
  1292. if (ret < 0) {
  1293. con->error_msg = "connect error";
  1294. goto out;
  1295. }
  1296. }
  1297. more:
  1298. dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
  1299. BUG_ON(!con->sock);
  1300. /* kvec data queued? */
  1301. if (con->v1.out_kvec_left) {
  1302. ret = write_partial_kvec(con);
  1303. if (ret <= 0)
  1304. goto out;
  1305. }
  1306. if (con->v1.out_skip) {
  1307. ret = write_partial_skip(con);
  1308. if (ret <= 0)
  1309. goto out;
  1310. }
  1311. /* msg pages? */
  1312. msg = con->out_msg;
  1313. if (msg) {
  1314. if (con->v1.out_msg_done) {
  1315. ceph_msg_put(msg);
  1316. con->out_msg = NULL; /* we're done with this one */
  1317. goto do_next;
  1318. }
  1319. ret = write_partial_message_data(con, msg);
  1320. if (ret == 1)
  1321. goto more; /* we need to send the footer, too! */
  1322. if (ret == 0)
  1323. goto out;
  1324. if (ret < 0) {
  1325. dout("try_write write_partial_message_data err %d\n",
  1326. ret);
  1327. goto out;
  1328. }
  1329. }
  1330. do_next:
  1331. if (con->state == CEPH_CON_S_OPEN) {
  1332. if (ceph_con_flag_test_and_clear(con,
  1333. CEPH_CON_F_KEEPALIVE_PENDING)) {
  1334. prepare_write_keepalive(con);
  1335. goto more;
  1336. }
  1337. /* is anything else pending? */
  1338. if ((msg = ceph_con_get_out_msg(con)) != NULL) {
  1339. prepare_write_message(con, msg);
  1340. goto more;
  1341. }
  1342. if (con->in_seq > con->in_seq_acked) {
  1343. prepare_write_ack(con);
  1344. goto more;
  1345. }
  1346. }
  1347. /* Nothing to do! */
  1348. ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
  1349. dout("try_write nothing else to write.\n");
  1350. ret = 0;
  1351. out:
  1352. dout("try_write done on %p ret %d\n", con, ret);
  1353. return ret;
  1354. }
  1355. void ceph_con_v1_revoke(struct ceph_connection *con, struct ceph_msg *msg)
  1356. {
  1357. WARN_ON(con->v1.out_skip);
  1358. /* footer */
  1359. if (con->v1.out_msg_done) {
  1360. con->v1.out_skip += con_out_kvec_skip(con);
  1361. } else {
  1362. WARN_ON(!msg->data_length);
  1363. con->v1.out_skip += sizeof_footer(con);
  1364. }
  1365. /* data, middle, front */
  1366. if (msg->data_length)
  1367. con->v1.out_skip += msg->cursor.total_resid;
  1368. if (msg->middle)
  1369. con->v1.out_skip += con_out_kvec_skip(con);
  1370. con->v1.out_skip += con_out_kvec_skip(con);
  1371. dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
  1372. con->v1.out_kvec_bytes, con->v1.out_skip);
  1373. }
  1374. void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
  1375. {
  1376. unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
  1377. unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
  1378. unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
  1379. /* skip rest of message */
  1380. con->v1.in_base_pos = con->v1.in_base_pos -
  1381. sizeof(struct ceph_msg_header) -
  1382. front_len -
  1383. middle_len -
  1384. data_len -
  1385. sizeof(struct ceph_msg_footer);
  1386. con->v1.in_tag = CEPH_MSGR_TAG_READY;
  1387. con->in_seq++;
  1388. dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
  1389. }
  1390. bool ceph_con_v1_opened(struct ceph_connection *con)
  1391. {
  1392. return con->v1.connect_seq;
  1393. }
  1394. void ceph_con_v1_reset_session(struct ceph_connection *con)
  1395. {
  1396. con->v1.connect_seq = 0;
  1397. con->v1.peer_global_seq = 0;
  1398. }
  1399. void ceph_con_v1_reset_protocol(struct ceph_connection *con)
  1400. {
  1401. con->v1.out_skip = 0;
  1402. }