io_thread.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /* RxRPC packet reception
  3. *
  4. * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
  5. * Written by David Howells (dhowells@redhat.com)
  6. */
  7. #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  8. #include "ar-internal.h"
  9. static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
  10. struct sockaddr_rxrpc *peer_srx,
  11. struct sk_buff *skb);
  12. /*
  13. * handle data received on the local endpoint
  14. * - may be called in interrupt context
  15. *
  16. * [!] Note that as this is called from the encap_rcv hook, the socket is not
  17. * held locked by the caller and nothing prevents sk_user_data on the UDP from
  18. * being cleared in the middle of processing this function.
  19. *
  20. * Called with the RCU read lock held from the IP layer via UDP.
  21. */
  22. int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
  23. {
  24. struct sk_buff_head *rx_queue;
  25. struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
  26. struct task_struct *io_thread;
  27. if (unlikely(!local)) {
  28. kfree_skb(skb);
  29. return 0;
  30. }
  31. io_thread = READ_ONCE(local->io_thread);
  32. if (!io_thread) {
  33. kfree_skb(skb);
  34. return 0;
  35. }
  36. if (skb->tstamp == 0)
  37. skb->tstamp = ktime_get_real();
  38. skb->mark = RXRPC_SKB_MARK_PACKET;
  39. rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
  40. rx_queue = &local->rx_queue;
  41. #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
  42. if (rxrpc_inject_rx_delay ||
  43. !skb_queue_empty(&local->rx_delay_queue)) {
  44. skb->tstamp = ktime_add_ms(skb->tstamp, rxrpc_inject_rx_delay);
  45. rx_queue = &local->rx_delay_queue;
  46. }
  47. #endif
  48. skb_queue_tail(rx_queue, skb);
  49. wake_up_process(io_thread);
  50. return 0;
  51. }
  52. /*
  53. * Handle an error received on the local endpoint.
  54. */
  55. void rxrpc_error_report(struct sock *sk)
  56. {
  57. struct rxrpc_local *local;
  58. struct sk_buff *skb;
  59. rcu_read_lock();
  60. local = rcu_dereference_sk_user_data(sk);
  61. if (unlikely(!local)) {
  62. rcu_read_unlock();
  63. return;
  64. }
  65. while ((skb = skb_dequeue(&sk->sk_error_queue))) {
  66. skb->mark = RXRPC_SKB_MARK_ERROR;
  67. rxrpc_new_skb(skb, rxrpc_skb_new_error_report);
  68. skb_queue_tail(&local->rx_queue, skb);
  69. }
  70. rxrpc_wake_up_io_thread(local);
  71. rcu_read_unlock();
  72. }
  73. /*
  74. * Directly produce an abort from a packet.
  75. */
  76. bool rxrpc_direct_abort(struct sk_buff *skb, enum rxrpc_abort_reason why,
  77. s32 abort_code, int err)
  78. {
  79. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  80. trace_rxrpc_abort(0, why, sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
  81. abort_code, err);
  82. skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
  83. skb->priority = abort_code;
  84. return false;
  85. }
  86. /*
  87. * Directly produce a connection abort from a packet.
  88. */
  89. bool rxrpc_direct_conn_abort(struct sk_buff *skb, enum rxrpc_abort_reason why,
  90. s32 abort_code, int err)
  91. {
  92. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  93. trace_rxrpc_abort(0, why, sp->hdr.cid, 0, sp->hdr.seq, abort_code, err);
  94. skb->mark = RXRPC_SKB_MARK_REJECT_CONN_ABORT;
  95. skb->priority = abort_code;
  96. return false;
  97. }
  98. static bool rxrpc_bad_message(struct sk_buff *skb, enum rxrpc_abort_reason why)
  99. {
  100. return rxrpc_direct_abort(skb, why, RX_PROTOCOL_ERROR, -EBADMSG);
  101. }
  102. #define just_discard true
  103. /*
  104. * Process event packets targeted at a local endpoint.
  105. */
  106. static bool rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb)
  107. {
  108. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  109. char v;
  110. _enter("");
  111. rxrpc_see_skb(skb, rxrpc_skb_see_version);
  112. if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) {
  113. if (v == 0)
  114. rxrpc_send_version_request(local, &sp->hdr, skb);
  115. }
  116. return true;
  117. }
  118. /*
  119. * Extract the wire header from a packet and translate the byte order.
  120. */
  121. static bool rxrpc_extract_header(struct rxrpc_skb_priv *sp,
  122. struct sk_buff *skb)
  123. {
  124. struct rxrpc_wire_header whdr;
  125. struct rxrpc_ackpacket ack;
  126. /* dig out the RxRPC connection details */
  127. if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0)
  128. return rxrpc_bad_message(skb, rxrpc_badmsg_short_hdr);
  129. memset(sp, 0, sizeof(*sp));
  130. sp->hdr.epoch = ntohl(whdr.epoch);
  131. sp->hdr.cid = ntohl(whdr.cid);
  132. sp->hdr.callNumber = ntohl(whdr.callNumber);
  133. sp->hdr.seq = ntohl(whdr.seq);
  134. sp->hdr.serial = ntohl(whdr.serial);
  135. sp->hdr.flags = whdr.flags;
  136. sp->hdr.type = whdr.type;
  137. sp->hdr.userStatus = whdr.userStatus;
  138. sp->hdr.securityIndex = whdr.securityIndex;
  139. sp->hdr._rsvd = ntohs(whdr._rsvd);
  140. sp->hdr.serviceId = ntohs(whdr.serviceId);
  141. if (sp->hdr.type == RXRPC_PACKET_TYPE_ACK) {
  142. if (skb_copy_bits(skb, sizeof(whdr), &ack, sizeof(ack)) < 0)
  143. return rxrpc_bad_message(skb, rxrpc_badmsg_short_ack);
  144. sp->ack.first_ack = ntohl(ack.firstPacket);
  145. sp->ack.prev_ack = ntohl(ack.previousPacket);
  146. sp->ack.acked_serial = ntohl(ack.serial);
  147. sp->ack.reason = ack.reason;
  148. sp->ack.nr_acks = ack.nAcks;
  149. }
  150. return true;
  151. }
  152. /*
  153. * Extract the abort code from an ABORT packet and stash it in skb->priority.
  154. */
  155. static bool rxrpc_extract_abort(struct sk_buff *skb)
  156. {
  157. __be32 wtmp;
  158. if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
  159. &wtmp, sizeof(wtmp)) < 0)
  160. return false;
  161. skb->priority = ntohl(wtmp);
  162. return true;
  163. }
  164. /*
  165. * Process packets received on the local endpoint
  166. */
  167. static bool rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
  168. {
  169. struct rxrpc_connection *conn;
  170. struct sockaddr_rxrpc peer_srx;
  171. struct rxrpc_skb_priv *sp;
  172. struct rxrpc_peer *peer = NULL;
  173. struct sk_buff *skb = *_skb;
  174. bool ret = false;
  175. skb_pull(skb, sizeof(struct udphdr));
  176. sp = rxrpc_skb(skb);
  177. /* dig out the RxRPC connection details */
  178. if (!rxrpc_extract_header(sp, skb))
  179. return just_discard;
  180. if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
  181. static int lose;
  182. if ((lose++ & 7) == 7) {
  183. trace_rxrpc_rx_lose(sp);
  184. return just_discard;
  185. }
  186. }
  187. trace_rxrpc_rx_packet(sp);
  188. switch (sp->hdr.type) {
  189. case RXRPC_PACKET_TYPE_VERSION:
  190. if (rxrpc_to_client(sp))
  191. return just_discard;
  192. return rxrpc_input_version(local, skb);
  193. case RXRPC_PACKET_TYPE_BUSY:
  194. if (rxrpc_to_server(sp))
  195. return just_discard;
  196. fallthrough;
  197. case RXRPC_PACKET_TYPE_ACK:
  198. case RXRPC_PACKET_TYPE_ACKALL:
  199. if (sp->hdr.callNumber == 0)
  200. return rxrpc_bad_message(skb, rxrpc_badmsg_zero_call);
  201. break;
  202. case RXRPC_PACKET_TYPE_ABORT:
  203. if (!rxrpc_extract_abort(skb))
  204. return just_discard; /* Just discard if malformed */
  205. break;
  206. case RXRPC_PACKET_TYPE_DATA:
  207. if (sp->hdr.callNumber == 0)
  208. return rxrpc_bad_message(skb, rxrpc_badmsg_zero_call);
  209. if (sp->hdr.seq == 0)
  210. return rxrpc_bad_message(skb, rxrpc_badmsg_zero_seq);
  211. /* Unshare the packet so that it can be modified for in-place
  212. * decryption.
  213. */
  214. if (sp->hdr.securityIndex != 0) {
  215. skb = skb_unshare(skb, GFP_ATOMIC);
  216. if (!skb) {
  217. rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare_nomem);
  218. *_skb = NULL;
  219. return just_discard;
  220. }
  221. if (skb != *_skb) {
  222. rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare);
  223. *_skb = skb;
  224. rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
  225. sp = rxrpc_skb(skb);
  226. }
  227. }
  228. break;
  229. case RXRPC_PACKET_TYPE_CHALLENGE:
  230. if (rxrpc_to_server(sp))
  231. return just_discard;
  232. break;
  233. case RXRPC_PACKET_TYPE_RESPONSE:
  234. if (rxrpc_to_client(sp))
  235. return just_discard;
  236. break;
  237. /* Packet types 9-11 should just be ignored. */
  238. case RXRPC_PACKET_TYPE_PARAMS:
  239. case RXRPC_PACKET_TYPE_10:
  240. case RXRPC_PACKET_TYPE_11:
  241. return just_discard;
  242. default:
  243. return rxrpc_bad_message(skb, rxrpc_badmsg_unsupported_packet);
  244. }
  245. if (sp->hdr.serviceId == 0)
  246. return rxrpc_bad_message(skb, rxrpc_badmsg_zero_service);
  247. if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0))
  248. return just_discard; /* Unsupported address type. */
  249. if (peer_srx.transport.family != local->srx.transport.family &&
  250. (peer_srx.transport.family == AF_INET &&
  251. local->srx.transport.family != AF_INET6)) {
  252. pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n",
  253. peer_srx.transport.family,
  254. local->srx.transport.family);
  255. return just_discard; /* Wrong address type. */
  256. }
  257. if (rxrpc_to_client(sp)) {
  258. rcu_read_lock();
  259. conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb);
  260. conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
  261. rcu_read_unlock();
  262. if (!conn)
  263. return rxrpc_protocol_error(skb, rxrpc_eproto_no_client_conn);
  264. ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
  265. rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
  266. return ret;
  267. }
  268. /* We need to look up service connections by the full protocol
  269. * parameter set. We look up the peer first as an intermediate step
  270. * and then the connection from the peer's tree.
  271. */
  272. rcu_read_lock();
  273. peer = rxrpc_lookup_peer_rcu(local, &peer_srx);
  274. if (!peer) {
  275. rcu_read_unlock();
  276. return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb);
  277. }
  278. conn = rxrpc_find_service_conn_rcu(peer, skb);
  279. conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
  280. if (conn) {
  281. rcu_read_unlock();
  282. ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
  283. rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
  284. return ret;
  285. }
  286. peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input);
  287. rcu_read_unlock();
  288. ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb);
  289. rxrpc_put_peer(peer, rxrpc_peer_put_input);
  290. return ret;
  291. }
  292. /*
  293. * Deal with a packet that's associated with an extant connection.
  294. */
  295. static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
  296. struct sockaddr_rxrpc *peer_srx,
  297. struct sk_buff *skb)
  298. {
  299. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  300. struct rxrpc_channel *chan;
  301. struct rxrpc_call *call = NULL;
  302. unsigned int channel;
  303. if (sp->hdr.securityIndex != conn->security_ix)
  304. return rxrpc_direct_abort(skb, rxrpc_eproto_wrong_security,
  305. RXKADINCONSISTENCY, -EBADMSG);
  306. if (sp->hdr.serviceId != conn->service_id) {
  307. int old_id;
  308. if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
  309. return rxrpc_protocol_error(skb, rxrpc_eproto_reupgrade);
  310. old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
  311. sp->hdr.serviceId);
  312. if (old_id != conn->orig_service_id &&
  313. old_id != sp->hdr.serviceId)
  314. return rxrpc_protocol_error(skb, rxrpc_eproto_bad_upgrade);
  315. }
  316. if (after(sp->hdr.serial, conn->hi_serial))
  317. conn->hi_serial = sp->hdr.serial;
  318. /* It's a connection-level packet if the call number is 0. */
  319. if (sp->hdr.callNumber == 0)
  320. return rxrpc_input_conn_packet(conn, skb);
  321. /* Deal with path MTU discovery probing. */
  322. if (sp->hdr.type == RXRPC_PACKET_TYPE_ACK &&
  323. conn->pmtud_probe &&
  324. after_eq(sp->ack.acked_serial, conn->pmtud_probe))
  325. rxrpc_input_probe_for_pmtud(conn, sp->ack.acked_serial, false);
  326. /* Call-bound packets are routed by connection channel. */
  327. channel = sp->hdr.cid & RXRPC_CHANNELMASK;
  328. chan = &conn->channels[channel];
  329. /* Ignore really old calls */
  330. if (sp->hdr.callNumber < chan->last_call)
  331. return just_discard;
  332. if (sp->hdr.callNumber == chan->last_call) {
  333. if (chan->call ||
  334. sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
  335. return just_discard;
  336. /* For the previous service call, if completed successfully, we
  337. * discard all further packets.
  338. */
  339. if (rxrpc_conn_is_service(conn) &&
  340. chan->last_type == RXRPC_PACKET_TYPE_ACK)
  341. return just_discard;
  342. /* But otherwise we need to retransmit the final packet from
  343. * data cached in the connection record.
  344. */
  345. if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
  346. trace_rxrpc_rx_data(chan->call_debug_id,
  347. sp->hdr.seq,
  348. sp->hdr.serial,
  349. sp->hdr.flags);
  350. rxrpc_conn_retransmit_call(conn, skb, channel);
  351. return just_discard;
  352. }
  353. call = rxrpc_try_get_call(chan->call, rxrpc_call_get_input);
  354. if (sp->hdr.callNumber > chan->call_id) {
  355. if (rxrpc_to_client(sp)) {
  356. if (call)
  357. rxrpc_put_call(call, rxrpc_call_put_input);
  358. return rxrpc_protocol_error(skb,
  359. rxrpc_eproto_unexpected_implicit_end);
  360. }
  361. if (call) {
  362. rxrpc_implicit_end_call(call, skb);
  363. rxrpc_put_call(call, rxrpc_call_put_input);
  364. call = NULL;
  365. }
  366. }
  367. if (!call) {
  368. if (rxrpc_to_client(sp))
  369. return rxrpc_protocol_error(skb, rxrpc_eproto_no_client_call);
  370. return rxrpc_new_incoming_call(conn->local, conn->peer, conn,
  371. peer_srx, skb);
  372. }
  373. rxrpc_queue_rx_call_packet(call, skb);
  374. rxrpc_put_call(call, rxrpc_call_put_input);
  375. return true;
  376. }
  377. /*
  378. * I/O and event handling thread.
  379. */
  380. int rxrpc_io_thread(void *data)
  381. {
  382. struct rxrpc_connection *conn;
  383. struct sk_buff_head rx_queue;
  384. struct rxrpc_local *local = data;
  385. struct rxrpc_call *call;
  386. struct sk_buff *skb;
  387. #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
  388. ktime_t now;
  389. #endif
  390. bool should_stop;
  391. LIST_HEAD(conn_attend_q);
  392. LIST_HEAD(call_attend_q);
  393. complete(&local->io_thread_ready);
  394. skb_queue_head_init(&rx_queue);
  395. set_user_nice(current, MIN_NICE);
  396. for (;;) {
  397. rxrpc_inc_stat(local->rxnet, stat_io_loop);
  398. /* Inject a delay into packets if requested. */
  399. #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
  400. now = ktime_get_real();
  401. while ((skb = skb_peek(&local->rx_delay_queue))) {
  402. if (ktime_before(now, skb->tstamp))
  403. break;
  404. skb = skb_dequeue(&local->rx_delay_queue);
  405. skb_queue_tail(&local->rx_queue, skb);
  406. }
  407. #endif
  408. if (!skb_queue_empty(&local->rx_queue)) {
  409. spin_lock_irq(&local->rx_queue.lock);
  410. skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
  411. spin_unlock_irq(&local->rx_queue.lock);
  412. trace_rxrpc_iothread_rx(local, skb_queue_len(&rx_queue));
  413. }
  414. /* Distribute packets and errors. */
  415. while ((skb = __skb_dequeue(&rx_queue))) {
  416. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  417. switch (skb->mark) {
  418. case RXRPC_SKB_MARK_PACKET:
  419. skb->priority = 0;
  420. if (!rxrpc_input_packet(local, &skb))
  421. rxrpc_reject_packet(local, skb);
  422. trace_rxrpc_rx_done(skb->mark, skb->priority);
  423. rxrpc_free_skb(skb, rxrpc_skb_put_input);
  424. break;
  425. case RXRPC_SKB_MARK_ERROR:
  426. rxrpc_input_error(local, skb);
  427. rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
  428. break;
  429. case RXRPC_SKB_MARK_SERVICE_CONN_SECURED:
  430. rxrpc_input_conn_event(sp->poke_conn, skb);
  431. rxrpc_put_connection(sp->poke_conn, rxrpc_conn_put_poke);
  432. rxrpc_free_skb(skb, rxrpc_skb_put_conn_secured);
  433. break;
  434. default:
  435. WARN_ON_ONCE(1);
  436. rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
  437. break;
  438. }
  439. }
  440. /* Deal with connections that want immediate attention. */
  441. if (!list_empty_careful(&local->conn_attend_q)) {
  442. spin_lock_irq(&local->lock);
  443. list_splice_tail_init(&local->conn_attend_q, &conn_attend_q);
  444. spin_unlock_irq(&local->lock);
  445. }
  446. while ((conn = list_first_entry_or_null(&conn_attend_q,
  447. struct rxrpc_connection,
  448. attend_link))) {
  449. spin_lock_irq(&local->lock);
  450. list_del_init(&conn->attend_link);
  451. spin_unlock_irq(&local->lock);
  452. rxrpc_input_conn_event(conn, NULL);
  453. rxrpc_put_connection(conn, rxrpc_conn_put_poke);
  454. }
  455. if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
  456. &local->client_conn_flags))
  457. rxrpc_discard_expired_client_conns(local);
  458. /* Deal with calls that want immediate attention. */
  459. spin_lock_irq(&local->lock);
  460. list_splice_tail_init(&local->call_attend_q, &call_attend_q);
  461. spin_unlock_irq(&local->lock);
  462. while ((call = list_first_entry_or_null(&call_attend_q,
  463. struct rxrpc_call,
  464. attend_link))) {
  465. spin_lock_irq(&local->lock);
  466. list_del_init(&call->attend_link);
  467. spin_unlock_irq(&local->lock);
  468. trace_rxrpc_call_poked(call);
  469. rxrpc_input_call_event(call);
  470. rxrpc_put_call(call, rxrpc_call_put_poke);
  471. }
  472. if (!list_empty(&local->new_client_calls))
  473. rxrpc_connect_client_calls(local);
  474. set_current_state(TASK_INTERRUPTIBLE);
  475. should_stop = kthread_should_stop();
  476. if (!skb_queue_empty(&local->rx_queue) ||
  477. !list_empty(&local->call_attend_q) ||
  478. !list_empty(&local->conn_attend_q) ||
  479. !list_empty(&local->new_client_calls) ||
  480. test_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
  481. &local->client_conn_flags)) {
  482. __set_current_state(TASK_RUNNING);
  483. continue;
  484. }
  485. if (should_stop)
  486. break;
  487. #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
  488. skb = skb_peek(&local->rx_delay_queue);
  489. if (skb) {
  490. unsigned long timeout;
  491. ktime_t tstamp = skb->tstamp;
  492. ktime_t now = ktime_get_real();
  493. s64 delay_ns = ktime_to_ns(ktime_sub(tstamp, now));
  494. if (delay_ns <= 0) {
  495. __set_current_state(TASK_RUNNING);
  496. continue;
  497. }
  498. timeout = nsecs_to_jiffies(delay_ns);
  499. timeout = umax(timeout, 1);
  500. schedule_timeout(timeout);
  501. __set_current_state(TASK_RUNNING);
  502. continue;
  503. }
  504. #endif
  505. schedule();
  506. }
  507. __set_current_state(TASK_RUNNING);
  508. rxrpc_see_local(local, rxrpc_local_stop);
  509. rxrpc_destroy_local(local);
  510. WRITE_ONCE(local->io_thread, NULL);
  511. rxrpc_see_local(local, rxrpc_local_stopped);
  512. return 0;
  513. }