stream.c 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * SUCS NET3:
  4. *
  5. * Generic stream handling routines. These are generic for most
  6. * protocols. Even IP. Tonight 8-).
  7. * This is used because TCP, LLC (others too) layer all have mostly
  8. * identical sendmsg() and recvmsg() code.
  9. * So we (will) share it here.
  10. *
  11. * Authors: Arnaldo Carvalho de Melo <acme@conectiva.com.br>
  12. * (from old tcp.c code)
  13. * Alan Cox <alan@lxorguk.ukuu.org.uk> (Borrowed comments 8-))
  14. */
  15. #include <linux/module.h>
  16. #include <linux/sched/signal.h>
  17. #include <linux/net.h>
  18. #include <linux/signal.h>
  19. #include <linux/tcp.h>
  20. #include <linux/wait.h>
  21. #include <net/sock.h>
  22. /**
  23. * sk_stream_write_space - stream socket write_space callback.
  24. * @sk: pointer to the socket structure
  25. *
  26. * This function is invoked when there's space available in the socket's
  27. * send buffer for writing. It first checks if the socket is writable,
  28. * clears the SOCK_NOSPACE flag indicating that memory for writing
  29. * is now available, wakes up any processes waiting for write operations
  30. * and sends asynchronous notifications if needed.
  31. */
  32. void sk_stream_write_space(struct sock *sk)
  33. {
  34. struct socket *sock = sk->sk_socket;
  35. struct socket_wq *wq;
  36. if (__sk_stream_is_writeable(sk, 1) && sock) {
  37. clear_bit(SOCK_NOSPACE, &sock->flags);
  38. rcu_read_lock();
  39. wq = rcu_dereference(sk->sk_wq);
  40. if (skwq_has_sleeper(wq))
  41. wake_up_interruptible_poll(&wq->wait, EPOLLOUT |
  42. EPOLLWRNORM | EPOLLWRBAND);
  43. if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
  44. sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
  45. rcu_read_unlock();
  46. }
  47. }
  48. /**
  49. * sk_stream_wait_connect - Wait for a socket to get into the connected state
  50. * @sk: sock to wait on
  51. * @timeo_p: for how long to wait
  52. *
  53. * Must be called with the socket locked.
  54. */
  55. int sk_stream_wait_connect(struct sock *sk, long *timeo_p)
  56. {
  57. DEFINE_WAIT_FUNC(wait, woken_wake_function);
  58. struct task_struct *tsk = current;
  59. int done;
  60. do {
  61. int err = sock_error(sk);
  62. if (err)
  63. return err;
  64. if ((1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV))
  65. return -EPIPE;
  66. if (!*timeo_p)
  67. return -EAGAIN;
  68. if (signal_pending(tsk))
  69. return sock_intr_errno(*timeo_p);
  70. add_wait_queue(sk_sleep(sk), &wait);
  71. sk->sk_write_pending++;
  72. done = sk_wait_event(sk, timeo_p,
  73. !READ_ONCE(sk->sk_err) &&
  74. !((1 << READ_ONCE(sk->sk_state)) &
  75. ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)), &wait);
  76. remove_wait_queue(sk_sleep(sk), &wait);
  77. sk->sk_write_pending--;
  78. } while (!done);
  79. return done < 0 ? done : 0;
  80. }
  81. EXPORT_SYMBOL(sk_stream_wait_connect);
  82. /**
  83. * sk_stream_closing - Return 1 if we still have things to send in our buffers.
  84. * @sk: socket to verify
  85. */
  86. static int sk_stream_closing(const struct sock *sk)
  87. {
  88. return (1 << READ_ONCE(sk->sk_state)) &
  89. (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK);
  90. }
  91. void sk_stream_wait_close(struct sock *sk, long timeout)
  92. {
  93. if (timeout) {
  94. DEFINE_WAIT_FUNC(wait, woken_wake_function);
  95. add_wait_queue(sk_sleep(sk), &wait);
  96. do {
  97. if (sk_wait_event(sk, &timeout, !sk_stream_closing(sk), &wait))
  98. break;
  99. } while (!signal_pending(current) && timeout);
  100. remove_wait_queue(sk_sleep(sk), &wait);
  101. }
  102. }
  103. EXPORT_SYMBOL(sk_stream_wait_close);
  104. /**
  105. * sk_stream_wait_memory - Wait for more memory for a socket
  106. * @sk: socket to wait for memory
  107. * @timeo_p: for how long
  108. */
  109. int sk_stream_wait_memory(struct sock *sk, long *timeo_p)
  110. {
  111. int ret, err = 0;
  112. long vm_wait = 0;
  113. long current_timeo = *timeo_p;
  114. DEFINE_WAIT_FUNC(wait, woken_wake_function);
  115. if (sk_stream_memory_free(sk))
  116. current_timeo = vm_wait = get_random_u32_below(HZ / 5) + 2;
  117. add_wait_queue(sk_sleep(sk), &wait);
  118. while (1) {
  119. sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
  120. if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
  121. goto do_error;
  122. if (!*timeo_p)
  123. goto do_eagain;
  124. if (signal_pending(current))
  125. goto do_interrupted;
  126. sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
  127. if (sk_stream_memory_free(sk) && !vm_wait)
  128. break;
  129. set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  130. sk->sk_write_pending++;
  131. ret = sk_wait_event(sk, &current_timeo, READ_ONCE(sk->sk_err) ||
  132. (READ_ONCE(sk->sk_shutdown) & SEND_SHUTDOWN) ||
  133. (sk_stream_memory_free(sk) && !vm_wait),
  134. &wait);
  135. sk->sk_write_pending--;
  136. if (ret < 0)
  137. goto do_error;
  138. if (vm_wait) {
  139. vm_wait -= current_timeo;
  140. current_timeo = *timeo_p;
  141. if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
  142. (current_timeo -= vm_wait) < 0)
  143. current_timeo = 0;
  144. vm_wait = 0;
  145. }
  146. *timeo_p = current_timeo;
  147. }
  148. out:
  149. if (!sock_flag(sk, SOCK_DEAD))
  150. remove_wait_queue(sk_sleep(sk), &wait);
  151. return err;
  152. do_error:
  153. err = -EPIPE;
  154. goto out;
  155. do_eagain:
  156. /* Make sure that whenever EAGAIN is returned, EPOLLOUT event can
  157. * be generated later.
  158. * When TCP receives ACK packets that make room, tcp_check_space()
  159. * only calls tcp_new_space() if SOCK_NOSPACE is set.
  160. */
  161. set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  162. err = -EAGAIN;
  163. goto out;
  164. do_interrupted:
  165. err = sock_intr_errno(*timeo_p);
  166. goto out;
  167. }
  168. EXPORT_SYMBOL(sk_stream_wait_memory);
  169. int sk_stream_error(struct sock *sk, int flags, int err)
  170. {
  171. if (err == -EPIPE)
  172. err = sock_error(sk) ? : -EPIPE;
  173. if (err == -EPIPE && !(flags & MSG_NOSIGNAL))
  174. send_sig(SIGPIPE, current, 0);
  175. return err;
  176. }
  177. EXPORT_SYMBOL(sk_stream_error);
  178. void sk_stream_kill_queues(struct sock *sk)
  179. {
  180. /* First the read buffer. */
  181. __skb_queue_purge(&sk->sk_receive_queue);
  182. /* Next, the error queue.
  183. * We need to use queue lock, because other threads might
  184. * add packets to the queue without socket lock being held.
  185. */
  186. skb_queue_purge(&sk->sk_error_queue);
  187. /* Next, the write queue. */
  188. WARN_ON_ONCE(!skb_queue_empty(&sk->sk_write_queue));
  189. /* Account for returned memory. */
  190. sk_mem_reclaim_final(sk);
  191. WARN_ON_ONCE(sk->sk_wmem_queued);
  192. /* It is _impossible_ for the backlog to contain anything
  193. * when we get here. All user references to this socket
  194. * have gone away, only the net layer knows can touch it.
  195. */
  196. }
  197. EXPORT_SYMBOL(sk_stream_kill_queues);