lowcomms.c 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983
  1. // SPDX-License-Identifier: GPL-2.0-only
  2. /******************************************************************************
  3. *******************************************************************************
  4. **
  5. ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
  6. ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
  7. **
  8. **
  9. *******************************************************************************
  10. ******************************************************************************/
  11. /*
  12. * lowcomms.c
  13. *
  14. * This is the "low-level" comms layer.
  15. *
  16. * It is responsible for sending/receiving messages
  17. * from other nodes in the cluster.
  18. *
  19. * Cluster nodes are referred to by their nodeids. nodeids are
  20. * simply 32 bit numbers to the locking module - if they need to
  21. * be expanded for the cluster infrastructure then that is its
  22. * responsibility. It is this layer's
  23. * responsibility to resolve these into IP address or
  24. * whatever it needs for inter-node communication.
  25. *
  26. * The comms level is two kernel threads that deal mainly with
  27. * the receiving of messages from other nodes and passing them
  28. * up to the mid-level comms layer (which understands the
  29. * message format) for execution by the locking core, and
  30. * a send thread which does all the setting up of connections
  31. * to remote nodes and the sending of data. Threads are not allowed
  32. * to send their own data because it may cause them to wait in times
  33. * of high load. Also, this way, the sending thread can collect together
  34. * messages bound for one node and send them in one block.
  35. *
  36. * lowcomms will choose to use either TCP or SCTP as its transport layer
  37. * depending on the configuration variable 'protocol'. This should be set
  38. * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  39. * cluster-wide mechanism as it must be the same on all nodes of the cluster
  40. * for the DLM to function.
  41. *
  42. */
  43. #include <asm/ioctls.h>
  44. #include <net/sock.h>
  45. #include <net/tcp.h>
  46. #include <linux/pagemap.h>
  47. #include <linux/file.h>
  48. #include <linux/mutex.h>
  49. #include <linux/sctp.h>
  50. #include <linux/slab.h>
  51. #include <net/sctp/sctp.h>
  52. #include <net/ipv6.h>
  53. #include <trace/events/dlm.h>
  54. #include <trace/events/sock.h>
  55. #include "dlm_internal.h"
  56. #include "lowcomms.h"
  57. #include "midcomms.h"
  58. #include "memory.h"
  59. #include "config.h"
  60. #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
  61. #define DLM_MAX_PROCESS_BUFFERS 24
  62. #define NEEDED_RMEM (4*1024*1024)
  63. struct connection {
  64. struct socket *sock; /* NULL if not connected */
  65. uint32_t nodeid; /* So we know who we are in the list */
  66. /* this semaphore is used to allow parallel recv/send in read
  67. * lock mode. When we release a sock we need to held the write lock.
  68. *
  69. * However this is locking code and not nice. When we remove the
  70. * othercon handling we can look into other mechanism to synchronize
  71. * io handling to call sock_release() at the right time.
  72. */
  73. struct rw_semaphore sock_lock;
  74. unsigned long flags;
  75. #define CF_APP_LIMITED 0
  76. #define CF_RECV_PENDING 1
  77. #define CF_SEND_PENDING 2
  78. #define CF_RECV_INTR 3
  79. #define CF_IO_STOP 4
  80. #define CF_IS_OTHERCON 5
  81. struct list_head writequeue; /* List of outgoing writequeue_entries */
  82. spinlock_t writequeue_lock;
  83. int retries;
  84. struct hlist_node list;
  85. /* due some connect()/accept() races we currently have this cross over
  86. * connection attempt second connection for one node.
  87. *
  88. * There is a solution to avoid the race by introducing a connect
  89. * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
  90. * connect. Otherside can connect but will only be considered that
  91. * the other side wants to have a reconnect.
  92. *
  93. * However changing to this behaviour will break backwards compatible.
  94. * In a DLM protocol major version upgrade we should remove this!
  95. */
  96. struct connection *othercon;
  97. struct work_struct rwork; /* receive worker */
  98. struct work_struct swork; /* send worker */
  99. wait_queue_head_t shutdown_wait;
  100. unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
  101. int rx_leftover;
  102. int mark;
  103. int addr_count;
  104. int curr_addr_index;
  105. struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
  106. spinlock_t addrs_lock;
  107. struct rcu_head rcu;
  108. };
  109. #define sock2con(x) ((struct connection *)(x)->sk_user_data)
  110. struct listen_connection {
  111. struct socket *sock;
  112. struct work_struct rwork;
  113. };
  114. #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
  115. #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
  116. /* An entry waiting to be sent */
  117. struct writequeue_entry {
  118. struct list_head list;
  119. struct page *page;
  120. int offset;
  121. int len;
  122. int end;
  123. int users;
  124. bool dirty;
  125. struct connection *con;
  126. struct list_head msgs;
  127. struct kref ref;
  128. };
  129. struct dlm_msg {
  130. struct writequeue_entry *entry;
  131. struct dlm_msg *orig_msg;
  132. bool retransmit;
  133. void *ppc;
  134. int len;
  135. int idx; /* new()/commit() idx exchange */
  136. struct list_head list;
  137. struct kref ref;
  138. };
  139. struct processqueue_entry {
  140. unsigned char *buf;
  141. int nodeid;
  142. int buflen;
  143. struct list_head list;
  144. };
  145. struct dlm_proto_ops {
  146. bool try_new_addr;
  147. const char *name;
  148. int proto;
  149. int how;
  150. void (*sockopts)(struct socket *sock);
  151. int (*bind)(struct socket *sock);
  152. int (*listen_validate)(void);
  153. void (*listen_sockopts)(struct socket *sock);
  154. int (*listen_bind)(struct socket *sock);
  155. };
  156. static struct listen_sock_callbacks {
  157. void (*sk_error_report)(struct sock *);
  158. void (*sk_data_ready)(struct sock *);
  159. void (*sk_state_change)(struct sock *);
  160. void (*sk_write_space)(struct sock *);
  161. } listen_sock;
  162. static struct listen_connection listen_con;
  163. static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
  164. static int dlm_local_count;
  165. /* Work queues */
  166. static struct workqueue_struct *io_workqueue;
  167. static struct workqueue_struct *process_workqueue;
  168. static struct hlist_head connection_hash[CONN_HASH_SIZE];
  169. static DEFINE_SPINLOCK(connections_lock);
  170. DEFINE_STATIC_SRCU(connections_srcu);
  171. static const struct dlm_proto_ops *dlm_proto_ops;
  172. #define DLM_IO_SUCCESS 0
  173. #define DLM_IO_END 1
  174. #define DLM_IO_EOF 2
  175. #define DLM_IO_RESCHED 3
  176. #define DLM_IO_FLUSH 4
  177. static void process_recv_sockets(struct work_struct *work);
  178. static void process_send_sockets(struct work_struct *work);
  179. static void process_dlm_messages(struct work_struct *work);
  180. static DECLARE_WORK(process_work, process_dlm_messages);
  181. static DEFINE_SPINLOCK(processqueue_lock);
  182. static bool process_dlm_messages_pending;
  183. static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
  184. static atomic_t processqueue_count;
  185. static LIST_HEAD(processqueue);
  186. bool dlm_lowcomms_is_running(void)
  187. {
  188. return !!listen_con.sock;
  189. }
  190. static void lowcomms_queue_swork(struct connection *con)
  191. {
  192. assert_spin_locked(&con->writequeue_lock);
  193. if (!test_bit(CF_IO_STOP, &con->flags) &&
  194. !test_bit(CF_APP_LIMITED, &con->flags) &&
  195. !test_and_set_bit(CF_SEND_PENDING, &con->flags))
  196. queue_work(io_workqueue, &con->swork);
  197. }
  198. static void lowcomms_queue_rwork(struct connection *con)
  199. {
  200. #ifdef CONFIG_LOCKDEP
  201. WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
  202. #endif
  203. if (!test_bit(CF_IO_STOP, &con->flags) &&
  204. !test_and_set_bit(CF_RECV_PENDING, &con->flags))
  205. queue_work(io_workqueue, &con->rwork);
  206. }
  207. static void writequeue_entry_ctor(void *data)
  208. {
  209. struct writequeue_entry *entry = data;
  210. INIT_LIST_HEAD(&entry->msgs);
  211. }
  212. struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
  213. {
  214. return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
  215. 0, 0, writequeue_entry_ctor);
  216. }
  217. struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
  218. {
  219. return KMEM_CACHE(dlm_msg, 0);
  220. }
  221. /* need to held writequeue_lock */
  222. static struct writequeue_entry *con_next_wq(struct connection *con)
  223. {
  224. struct writequeue_entry *e;
  225. e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
  226. list);
  227. /* if len is zero nothing is to send, if there are users filling
  228. * buffers we wait until the users are done so we can send more.
  229. */
  230. if (!e || e->users || e->len == 0)
  231. return NULL;
  232. return e;
  233. }
  234. static struct connection *__find_con(int nodeid, int r)
  235. {
  236. struct connection *con;
  237. hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
  238. if (con->nodeid == nodeid)
  239. return con;
  240. }
  241. return NULL;
  242. }
  243. static void dlm_con_init(struct connection *con, int nodeid)
  244. {
  245. con->nodeid = nodeid;
  246. init_rwsem(&con->sock_lock);
  247. INIT_LIST_HEAD(&con->writequeue);
  248. spin_lock_init(&con->writequeue_lock);
  249. INIT_WORK(&con->swork, process_send_sockets);
  250. INIT_WORK(&con->rwork, process_recv_sockets);
  251. spin_lock_init(&con->addrs_lock);
  252. init_waitqueue_head(&con->shutdown_wait);
  253. }
  254. /*
  255. * If 'allocation' is zero then we don't attempt to create a new
  256. * connection structure for this node.
  257. */
  258. static struct connection *nodeid2con(int nodeid, gfp_t alloc)
  259. {
  260. struct connection *con, *tmp;
  261. int r;
  262. r = nodeid_hash(nodeid);
  263. con = __find_con(nodeid, r);
  264. if (con || !alloc)
  265. return con;
  266. con = kzalloc_obj(*con, alloc);
  267. if (!con)
  268. return NULL;
  269. dlm_con_init(con, nodeid);
  270. spin_lock(&connections_lock);
  271. /* Because multiple workqueues/threads calls this function it can
  272. * race on multiple cpu's. Instead of locking hot path __find_con()
  273. * we just check in rare cases of recently added nodes again
  274. * under protection of connections_lock. If this is the case we
  275. * abort our connection creation and return the existing connection.
  276. */
  277. tmp = __find_con(nodeid, r);
  278. if (tmp) {
  279. spin_unlock(&connections_lock);
  280. kfree(con);
  281. return tmp;
  282. }
  283. hlist_add_head_rcu(&con->list, &connection_hash[r]);
  284. spin_unlock(&connections_lock);
  285. return con;
  286. }
  287. static int addr_compare(const struct sockaddr_storage *x,
  288. const struct sockaddr_storage *y)
  289. {
  290. switch (x->ss_family) {
  291. case AF_INET: {
  292. struct sockaddr_in *sinx = (struct sockaddr_in *)x;
  293. struct sockaddr_in *siny = (struct sockaddr_in *)y;
  294. if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
  295. return 0;
  296. if (sinx->sin_port != siny->sin_port)
  297. return 0;
  298. break;
  299. }
  300. case AF_INET6: {
  301. struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
  302. struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
  303. if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
  304. return 0;
  305. if (sinx->sin6_port != siny->sin6_port)
  306. return 0;
  307. break;
  308. }
  309. default:
  310. return 0;
  311. }
  312. return 1;
  313. }
  314. static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
  315. struct sockaddr *sa_out, bool try_new_addr,
  316. unsigned int *mark)
  317. {
  318. struct sockaddr_storage sas;
  319. struct connection *con;
  320. int idx;
  321. if (!dlm_local_count)
  322. return -1;
  323. idx = srcu_read_lock(&connections_srcu);
  324. con = nodeid2con(nodeid, 0);
  325. if (!con) {
  326. srcu_read_unlock(&connections_srcu, idx);
  327. return -ENOENT;
  328. }
  329. spin_lock(&con->addrs_lock);
  330. if (!con->addr_count) {
  331. spin_unlock(&con->addrs_lock);
  332. srcu_read_unlock(&connections_srcu, idx);
  333. return -ENOENT;
  334. }
  335. memcpy(&sas, &con->addr[con->curr_addr_index],
  336. sizeof(struct sockaddr_storage));
  337. if (try_new_addr) {
  338. con->curr_addr_index++;
  339. if (con->curr_addr_index == con->addr_count)
  340. con->curr_addr_index = 0;
  341. }
  342. *mark = con->mark;
  343. spin_unlock(&con->addrs_lock);
  344. if (sas_out)
  345. memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
  346. if (!sa_out) {
  347. srcu_read_unlock(&connections_srcu, idx);
  348. return 0;
  349. }
  350. if (dlm_local_addr[0].ss_family == AF_INET) {
  351. struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
  352. struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
  353. ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
  354. } else {
  355. struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas;
  356. struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
  357. ret6->sin6_addr = in6->sin6_addr;
  358. }
  359. srcu_read_unlock(&connections_srcu, idx);
  360. return 0;
  361. }
  362. static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
  363. unsigned int *mark)
  364. {
  365. struct connection *con;
  366. int i, idx, addr_i;
  367. idx = srcu_read_lock(&connections_srcu);
  368. for (i = 0; i < CONN_HASH_SIZE; i++) {
  369. hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
  370. WARN_ON_ONCE(!con->addr_count);
  371. spin_lock(&con->addrs_lock);
  372. for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
  373. if (addr_compare(&con->addr[addr_i], addr)) {
  374. *nodeid = con->nodeid;
  375. *mark = con->mark;
  376. spin_unlock(&con->addrs_lock);
  377. srcu_read_unlock(&connections_srcu, idx);
  378. return 0;
  379. }
  380. }
  381. spin_unlock(&con->addrs_lock);
  382. }
  383. }
  384. srcu_read_unlock(&connections_srcu, idx);
  385. return -ENOENT;
  386. }
  387. static bool dlm_lowcomms_con_has_addr(const struct connection *con,
  388. const struct sockaddr_storage *addr)
  389. {
  390. int i;
  391. for (i = 0; i < con->addr_count; i++) {
  392. if (addr_compare(&con->addr[i], addr))
  393. return true;
  394. }
  395. return false;
  396. }
  397. int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr)
  398. {
  399. struct connection *con;
  400. bool ret;
  401. int idx;
  402. idx = srcu_read_lock(&connections_srcu);
  403. con = nodeid2con(nodeid, GFP_NOFS);
  404. if (!con) {
  405. srcu_read_unlock(&connections_srcu, idx);
  406. return -ENOMEM;
  407. }
  408. spin_lock(&con->addrs_lock);
  409. if (!con->addr_count) {
  410. memcpy(&con->addr[0], addr, sizeof(*addr));
  411. con->addr_count = 1;
  412. con->mark = dlm_config.ci_mark;
  413. spin_unlock(&con->addrs_lock);
  414. srcu_read_unlock(&connections_srcu, idx);
  415. return 0;
  416. }
  417. ret = dlm_lowcomms_con_has_addr(con, addr);
  418. if (ret) {
  419. spin_unlock(&con->addrs_lock);
  420. srcu_read_unlock(&connections_srcu, idx);
  421. return -EEXIST;
  422. }
  423. if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
  424. spin_unlock(&con->addrs_lock);
  425. srcu_read_unlock(&connections_srcu, idx);
  426. return -ENOSPC;
  427. }
  428. memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
  429. srcu_read_unlock(&connections_srcu, idx);
  430. spin_unlock(&con->addrs_lock);
  431. return 0;
  432. }
  433. /* Data available on socket or listen socket received a connect */
  434. static void lowcomms_data_ready(struct sock *sk)
  435. {
  436. struct connection *con = sock2con(sk);
  437. trace_sk_data_ready(sk);
  438. set_bit(CF_RECV_INTR, &con->flags);
  439. lowcomms_queue_rwork(con);
  440. }
  441. static void lowcomms_write_space(struct sock *sk)
  442. {
  443. struct connection *con = sock2con(sk);
  444. clear_bit(SOCK_NOSPACE, &con->sock->flags);
  445. spin_lock_bh(&con->writequeue_lock);
  446. if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
  447. con->sock->sk->sk_write_pending--;
  448. clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
  449. }
  450. lowcomms_queue_swork(con);
  451. spin_unlock_bh(&con->writequeue_lock);
  452. }
  453. static void lowcomms_state_change(struct sock *sk)
  454. {
  455. /* SCTP layer is not calling sk_data_ready when the connection
  456. * is done, so we catch the signal through here.
  457. */
  458. if (sk->sk_shutdown & RCV_SHUTDOWN)
  459. lowcomms_data_ready(sk);
  460. }
  461. static void lowcomms_listen_data_ready(struct sock *sk)
  462. {
  463. trace_sk_data_ready(sk);
  464. queue_work(io_workqueue, &listen_con.rwork);
  465. }
  466. int dlm_lowcomms_connect_node(int nodeid)
  467. {
  468. struct connection *con;
  469. int idx;
  470. idx = srcu_read_lock(&connections_srcu);
  471. con = nodeid2con(nodeid, 0);
  472. if (WARN_ON_ONCE(!con)) {
  473. srcu_read_unlock(&connections_srcu, idx);
  474. return -ENOENT;
  475. }
  476. down_read(&con->sock_lock);
  477. if (!con->sock) {
  478. spin_lock_bh(&con->writequeue_lock);
  479. lowcomms_queue_swork(con);
  480. spin_unlock_bh(&con->writequeue_lock);
  481. }
  482. up_read(&con->sock_lock);
  483. srcu_read_unlock(&connections_srcu, idx);
  484. cond_resched();
  485. return 0;
  486. }
  487. int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
  488. {
  489. struct connection *con;
  490. int idx;
  491. idx = srcu_read_lock(&connections_srcu);
  492. con = nodeid2con(nodeid, 0);
  493. if (!con) {
  494. srcu_read_unlock(&connections_srcu, idx);
  495. return -ENOENT;
  496. }
  497. spin_lock(&con->addrs_lock);
  498. con->mark = mark;
  499. spin_unlock(&con->addrs_lock);
  500. srcu_read_unlock(&connections_srcu, idx);
  501. return 0;
  502. }
  503. static void lowcomms_error_report(struct sock *sk)
  504. {
  505. struct connection *con = sock2con(sk);
  506. struct inet_sock *inet;
  507. inet = inet_sk(sk);
  508. switch (sk->sk_family) {
  509. case AF_INET:
  510. printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
  511. "sending to node %d at %pI4, dport %d, "
  512. "sk_err=%d/%d\n", dlm_our_nodeid(),
  513. con->nodeid, &inet->inet_daddr,
  514. ntohs(inet->inet_dport), sk->sk_err,
  515. READ_ONCE(sk->sk_err_soft));
  516. break;
  517. #if IS_ENABLED(CONFIG_IPV6)
  518. case AF_INET6:
  519. printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
  520. "sending to node %d at %pI6c, "
  521. "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
  522. con->nodeid, &sk->sk_v6_daddr,
  523. ntohs(inet->inet_dport), sk->sk_err,
  524. READ_ONCE(sk->sk_err_soft));
  525. break;
  526. #endif
  527. default:
  528. printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
  529. "invalid socket family %d set, "
  530. "sk_err=%d/%d\n", dlm_our_nodeid(),
  531. sk->sk_family, sk->sk_err,
  532. READ_ONCE(sk->sk_err_soft));
  533. break;
  534. }
  535. dlm_midcomms_unack_msg_resend(con->nodeid);
  536. listen_sock.sk_error_report(sk);
  537. }
  538. static void restore_callbacks(struct sock *sk)
  539. {
  540. #ifdef CONFIG_LOCKDEP
  541. WARN_ON_ONCE(!lockdep_sock_is_held(sk));
  542. #endif
  543. sk->sk_user_data = NULL;
  544. sk->sk_data_ready = listen_sock.sk_data_ready;
  545. sk->sk_state_change = listen_sock.sk_state_change;
  546. sk->sk_write_space = listen_sock.sk_write_space;
  547. sk->sk_error_report = listen_sock.sk_error_report;
  548. }
  549. /* Make a socket active */
  550. static void add_sock(struct socket *sock, struct connection *con)
  551. {
  552. struct sock *sk = sock->sk;
  553. lock_sock(sk);
  554. con->sock = sock;
  555. sk->sk_user_data = con;
  556. sk->sk_data_ready = lowcomms_data_ready;
  557. sk->sk_write_space = lowcomms_write_space;
  558. if (dlm_config.ci_protocol == DLM_PROTO_SCTP)
  559. sk->sk_state_change = lowcomms_state_change;
  560. sk->sk_allocation = GFP_NOFS;
  561. sk->sk_use_task_frag = false;
  562. sk->sk_error_report = lowcomms_error_report;
  563. release_sock(sk);
  564. }
  565. /* Add the port number to an IPv6 or 4 sockaddr and return the address
  566. length */
  567. static void make_sockaddr(struct sockaddr_storage *saddr, __be16 port,
  568. int *addr_len)
  569. {
  570. saddr->ss_family = dlm_local_addr[0].ss_family;
  571. if (saddr->ss_family == AF_INET) {
  572. struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
  573. in4_addr->sin_port = port;
  574. *addr_len = sizeof(struct sockaddr_in);
  575. memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
  576. } else {
  577. struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
  578. in6_addr->sin6_port = port;
  579. *addr_len = sizeof(struct sockaddr_in6);
  580. }
  581. memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
  582. }
  583. static void dlm_page_release(struct kref *kref)
  584. {
  585. struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
  586. ref);
  587. __free_page(e->page);
  588. dlm_free_writequeue(e);
  589. }
  590. static void dlm_msg_release(struct kref *kref)
  591. {
  592. struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
  593. kref_put(&msg->entry->ref, dlm_page_release);
  594. dlm_free_msg(msg);
  595. }
  596. static void free_entry(struct writequeue_entry *e)
  597. {
  598. struct dlm_msg *msg, *tmp;
  599. list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
  600. if (msg->orig_msg) {
  601. msg->orig_msg->retransmit = false;
  602. kref_put(&msg->orig_msg->ref, dlm_msg_release);
  603. }
  604. list_del(&msg->list);
  605. kref_put(&msg->ref, dlm_msg_release);
  606. }
  607. list_del(&e->list);
  608. kref_put(&e->ref, dlm_page_release);
  609. }
  610. static void dlm_close_sock(struct socket **sock)
  611. {
  612. lock_sock((*sock)->sk);
  613. restore_callbacks((*sock)->sk);
  614. release_sock((*sock)->sk);
  615. sock_release(*sock);
  616. *sock = NULL;
  617. }
  618. static void allow_connection_io(struct connection *con)
  619. {
  620. if (con->othercon)
  621. clear_bit(CF_IO_STOP, &con->othercon->flags);
  622. clear_bit(CF_IO_STOP, &con->flags);
  623. }
  624. static void stop_connection_io(struct connection *con)
  625. {
  626. if (con->othercon)
  627. stop_connection_io(con->othercon);
  628. spin_lock_bh(&con->writequeue_lock);
  629. set_bit(CF_IO_STOP, &con->flags);
  630. spin_unlock_bh(&con->writequeue_lock);
  631. down_write(&con->sock_lock);
  632. if (con->sock) {
  633. lock_sock(con->sock->sk);
  634. restore_callbacks(con->sock->sk);
  635. release_sock(con->sock->sk);
  636. }
  637. up_write(&con->sock_lock);
  638. cancel_work_sync(&con->swork);
  639. cancel_work_sync(&con->rwork);
  640. }
  641. /* Close a remote connection and tidy up */
  642. static void close_connection(struct connection *con, bool and_other)
  643. {
  644. struct writequeue_entry *e;
  645. if (con->othercon && and_other)
  646. close_connection(con->othercon, false);
  647. down_write(&con->sock_lock);
  648. if (!con->sock) {
  649. up_write(&con->sock_lock);
  650. return;
  651. }
  652. dlm_close_sock(&con->sock);
  653. /* if we send a writequeue entry only a half way, we drop the
  654. * whole entry because reconnection and that we not start of the
  655. * middle of a msg which will confuse the other end.
  656. *
  657. * we can always drop messages because retransmits, but what we
  658. * cannot allow is to transmit half messages which may be processed
  659. * at the other side.
  660. *
  661. * our policy is to start on a clean state when disconnects, we don't
  662. * know what's send/received on transport layer in this case.
  663. */
  664. spin_lock_bh(&con->writequeue_lock);
  665. if (!list_empty(&con->writequeue)) {
  666. e = list_first_entry(&con->writequeue, struct writequeue_entry,
  667. list);
  668. if (e->dirty)
  669. free_entry(e);
  670. }
  671. spin_unlock_bh(&con->writequeue_lock);
  672. con->rx_leftover = 0;
  673. con->retries = 0;
  674. clear_bit(CF_APP_LIMITED, &con->flags);
  675. clear_bit(CF_RECV_PENDING, &con->flags);
  676. clear_bit(CF_SEND_PENDING, &con->flags);
  677. up_write(&con->sock_lock);
  678. }
  679. static void shutdown_connection(struct connection *con, bool and_other)
  680. {
  681. int ret;
  682. if (con->othercon && and_other)
  683. shutdown_connection(con->othercon, false);
  684. flush_workqueue(io_workqueue);
  685. down_read(&con->sock_lock);
  686. /* nothing to shutdown */
  687. if (!con->sock) {
  688. up_read(&con->sock_lock);
  689. return;
  690. }
  691. ret = kernel_sock_shutdown(con->sock, dlm_proto_ops->how);
  692. up_read(&con->sock_lock);
  693. if (ret) {
  694. log_print("Connection %p failed to shutdown: %d will force close",
  695. con, ret);
  696. goto force_close;
  697. } else {
  698. ret = wait_event_timeout(con->shutdown_wait, !con->sock,
  699. DLM_SHUTDOWN_WAIT_TIMEOUT);
  700. if (ret == 0) {
  701. log_print("Connection %p shutdown timed out, will force close",
  702. con);
  703. goto force_close;
  704. }
  705. }
  706. return;
  707. force_close:
  708. close_connection(con, false);
  709. }
  710. static struct processqueue_entry *new_processqueue_entry(int nodeid,
  711. int buflen)
  712. {
  713. struct processqueue_entry *pentry;
  714. pentry = kmalloc_obj(*pentry, GFP_NOFS);
  715. if (!pentry)
  716. return NULL;
  717. pentry->buf = kmalloc(buflen, GFP_NOFS);
  718. if (!pentry->buf) {
  719. kfree(pentry);
  720. return NULL;
  721. }
  722. pentry->nodeid = nodeid;
  723. return pentry;
  724. }
  725. static void free_processqueue_entry(struct processqueue_entry *pentry)
  726. {
  727. kfree(pentry->buf);
  728. kfree(pentry);
  729. }
  730. static void process_dlm_messages(struct work_struct *work)
  731. {
  732. struct processqueue_entry *pentry;
  733. spin_lock_bh(&processqueue_lock);
  734. pentry = list_first_entry_or_null(&processqueue,
  735. struct processqueue_entry, list);
  736. if (WARN_ON_ONCE(!pentry)) {
  737. process_dlm_messages_pending = false;
  738. spin_unlock_bh(&processqueue_lock);
  739. return;
  740. }
  741. list_del(&pentry->list);
  742. if (atomic_dec_and_test(&processqueue_count))
  743. wake_up(&processqueue_wq);
  744. spin_unlock_bh(&processqueue_lock);
  745. for (;;) {
  746. dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
  747. pentry->buflen);
  748. free_processqueue_entry(pentry);
  749. spin_lock_bh(&processqueue_lock);
  750. pentry = list_first_entry_or_null(&processqueue,
  751. struct processqueue_entry, list);
  752. if (!pentry) {
  753. process_dlm_messages_pending = false;
  754. spin_unlock_bh(&processqueue_lock);
  755. break;
  756. }
  757. list_del(&pentry->list);
  758. if (atomic_dec_and_test(&processqueue_count))
  759. wake_up(&processqueue_wq);
  760. spin_unlock_bh(&processqueue_lock);
  761. }
  762. }
  763. /* Data received from remote end */
  764. static int receive_from_sock(struct connection *con, int buflen)
  765. {
  766. struct processqueue_entry *pentry;
  767. int ret, buflen_real;
  768. struct msghdr msg;
  769. struct kvec iov;
  770. pentry = new_processqueue_entry(con->nodeid, buflen);
  771. if (!pentry)
  772. return DLM_IO_RESCHED;
  773. memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
  774. /* calculate new buffer parameter regarding last receive and
  775. * possible leftover bytes
  776. */
  777. iov.iov_base = pentry->buf + con->rx_leftover;
  778. iov.iov_len = buflen - con->rx_leftover;
  779. memset(&msg, 0, sizeof(msg));
  780. msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
  781. clear_bit(CF_RECV_INTR, &con->flags);
  782. again:
  783. ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
  784. msg.msg_flags);
  785. trace_dlm_recv(con->nodeid, ret);
  786. if (ret == -EAGAIN) {
  787. lock_sock(con->sock->sk);
  788. if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
  789. release_sock(con->sock->sk);
  790. goto again;
  791. }
  792. clear_bit(CF_RECV_PENDING, &con->flags);
  793. release_sock(con->sock->sk);
  794. free_processqueue_entry(pentry);
  795. return DLM_IO_END;
  796. } else if (ret == 0) {
  797. /* close will clear CF_RECV_PENDING */
  798. free_processqueue_entry(pentry);
  799. return DLM_IO_EOF;
  800. } else if (ret < 0) {
  801. free_processqueue_entry(pentry);
  802. return ret;
  803. }
  804. /* new buflen according readed bytes and leftover from last receive */
  805. buflen_real = ret + con->rx_leftover;
  806. ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
  807. buflen_real);
  808. if (ret < 0) {
  809. free_processqueue_entry(pentry);
  810. return ret;
  811. }
  812. pentry->buflen = ret;
  813. /* calculate leftover bytes from process and put it into begin of
  814. * the receive buffer, so next receive we have the full message
  815. * at the start address of the receive buffer.
  816. */
  817. con->rx_leftover = buflen_real - ret;
  818. memmove(con->rx_leftover_buf, pentry->buf + ret,
  819. con->rx_leftover);
  820. spin_lock_bh(&processqueue_lock);
  821. ret = atomic_inc_return(&processqueue_count);
  822. list_add_tail(&pentry->list, &processqueue);
  823. if (!process_dlm_messages_pending) {
  824. process_dlm_messages_pending = true;
  825. queue_work(process_workqueue, &process_work);
  826. }
  827. spin_unlock_bh(&processqueue_lock);
  828. if (ret > DLM_MAX_PROCESS_BUFFERS)
  829. return DLM_IO_FLUSH;
  830. return DLM_IO_SUCCESS;
  831. }
  832. /* Listening socket is busy, accept a connection */
  833. static int accept_from_sock(void)
  834. {
  835. struct sockaddr_storage peeraddr;
  836. int len, idx, result, nodeid;
  837. struct connection *newcon;
  838. struct socket *newsock;
  839. unsigned int mark;
  840. result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK);
  841. if (result == -EAGAIN)
  842. return DLM_IO_END;
  843. else if (result < 0)
  844. goto accept_err;
  845. /* Get the connected socket's peer */
  846. memset(&peeraddr, 0, sizeof(peeraddr));
  847. len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
  848. if (len < 0) {
  849. result = -ECONNABORTED;
  850. goto accept_err;
  851. }
  852. /* Get the new node's NODEID */
  853. make_sockaddr(&peeraddr, 0, &len);
  854. if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
  855. switch (peeraddr.ss_family) {
  856. case AF_INET: {
  857. struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
  858. log_print("connect from non cluster IPv4 node %pI4",
  859. &sin->sin_addr);
  860. break;
  861. }
  862. #if IS_ENABLED(CONFIG_IPV6)
  863. case AF_INET6: {
  864. struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
  865. log_print("connect from non cluster IPv6 node %pI6c",
  866. &sin6->sin6_addr);
  867. break;
  868. }
  869. #endif
  870. default:
  871. log_print("invalid family from non cluster node");
  872. break;
  873. }
  874. sock_release(newsock);
  875. return -1;
  876. }
  877. log_print("got connection from %d", nodeid);
  878. /* Check to see if we already have a connection to this node. This
  879. * could happen if the two nodes initiate a connection at roughly
  880. * the same time and the connections cross on the wire.
  881. * In this case we store the incoming one in "othercon"
  882. */
  883. idx = srcu_read_lock(&connections_srcu);
  884. newcon = nodeid2con(nodeid, 0);
  885. if (WARN_ON_ONCE(!newcon)) {
  886. srcu_read_unlock(&connections_srcu, idx);
  887. result = -ENOENT;
  888. goto accept_err;
  889. }
  890. sock_set_mark(newsock->sk, mark);
  891. down_write(&newcon->sock_lock);
  892. if (newcon->sock) {
  893. struct connection *othercon = newcon->othercon;
  894. if (!othercon) {
  895. othercon = kzalloc_obj(*othercon, GFP_NOFS);
  896. if (!othercon) {
  897. log_print("failed to allocate incoming socket");
  898. up_write(&newcon->sock_lock);
  899. srcu_read_unlock(&connections_srcu, idx);
  900. result = -ENOMEM;
  901. goto accept_err;
  902. }
  903. dlm_con_init(othercon, nodeid);
  904. lockdep_set_subclass(&othercon->sock_lock, 1);
  905. newcon->othercon = othercon;
  906. set_bit(CF_IS_OTHERCON, &othercon->flags);
  907. } else {
  908. /* close other sock con if we have something new */
  909. close_connection(othercon, false);
  910. }
  911. down_write(&othercon->sock_lock);
  912. add_sock(newsock, othercon);
  913. /* check if we receved something while adding */
  914. lock_sock(othercon->sock->sk);
  915. lowcomms_queue_rwork(othercon);
  916. release_sock(othercon->sock->sk);
  917. up_write(&othercon->sock_lock);
  918. }
  919. else {
  920. /* accept copies the sk after we've saved the callbacks, so we
  921. don't want to save them a second time or comm errors will
  922. result in calling sk_error_report recursively. */
  923. add_sock(newsock, newcon);
  924. /* check if we receved something while adding */
  925. lock_sock(newcon->sock->sk);
  926. lowcomms_queue_rwork(newcon);
  927. release_sock(newcon->sock->sk);
  928. }
  929. up_write(&newcon->sock_lock);
  930. srcu_read_unlock(&connections_srcu, idx);
  931. return DLM_IO_SUCCESS;
  932. accept_err:
  933. if (newsock)
  934. sock_release(newsock);
  935. return result;
  936. }
  937. /*
  938. * writequeue_entry_complete - try to delete and free write queue entry
  939. * @e: write queue entry to try to delete
  940. * @completed: bytes completed
  941. *
  942. * writequeue_lock must be held.
  943. */
  944. static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
  945. {
  946. e->offset += completed;
  947. e->len -= completed;
  948. /* signal that page was half way transmitted */
  949. e->dirty = true;
  950. if (e->len == 0 && e->users == 0)
  951. free_entry(e);
  952. }
  953. /*
  954. * sctp_bind_addrs - bind a SCTP socket to all our addresses
  955. */
  956. static int sctp_bind_addrs(struct socket *sock, __be16 port)
  957. {
  958. struct sockaddr_storage localaddr;
  959. struct sockaddr_unsized *addr = (struct sockaddr_unsized *)&localaddr;
  960. int i, addr_len, result = 0;
  961. for (i = 0; i < dlm_local_count; i++) {
  962. memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
  963. make_sockaddr(&localaddr, port, &addr_len);
  964. if (!i)
  965. result = kernel_bind(sock, addr, addr_len);
  966. else
  967. result = sock_bind_add(sock->sk, addr, addr_len);
  968. if (result < 0) {
  969. log_print("Can't bind to %d addr number %d, %d.\n",
  970. port, i + 1, result);
  971. break;
  972. }
  973. }
  974. return result;
  975. }
  976. /* Get local addresses */
  977. static void init_local(void)
  978. {
  979. struct sockaddr_storage sas;
  980. int i;
  981. dlm_local_count = 0;
  982. for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
  983. if (dlm_our_addr(&sas, i))
  984. break;
  985. memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
  986. }
  987. }
  988. static struct writequeue_entry *new_writequeue_entry(struct connection *con)
  989. {
  990. struct writequeue_entry *entry;
  991. entry = dlm_allocate_writequeue();
  992. if (!entry)
  993. return NULL;
  994. entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
  995. if (!entry->page) {
  996. dlm_free_writequeue(entry);
  997. return NULL;
  998. }
  999. entry->offset = 0;
  1000. entry->len = 0;
  1001. entry->end = 0;
  1002. entry->dirty = false;
  1003. entry->con = con;
  1004. entry->users = 1;
  1005. kref_init(&entry->ref);
  1006. return entry;
  1007. }
  1008. static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
  1009. char **ppc, void (*cb)(void *data),
  1010. void *data)
  1011. {
  1012. struct writequeue_entry *e;
  1013. spin_lock_bh(&con->writequeue_lock);
  1014. if (!list_empty(&con->writequeue)) {
  1015. e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
  1016. if (DLM_WQ_REMAIN_BYTES(e) >= len) {
  1017. kref_get(&e->ref);
  1018. *ppc = page_address(e->page) + e->end;
  1019. if (cb)
  1020. cb(data);
  1021. e->end += len;
  1022. e->users++;
  1023. goto out;
  1024. }
  1025. }
  1026. e = new_writequeue_entry(con);
  1027. if (!e)
  1028. goto out;
  1029. kref_get(&e->ref);
  1030. *ppc = page_address(e->page);
  1031. e->end += len;
  1032. if (cb)
  1033. cb(data);
  1034. list_add_tail(&e->list, &con->writequeue);
  1035. out:
  1036. spin_unlock_bh(&con->writequeue_lock);
  1037. return e;
  1038. };
  1039. static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
  1040. char **ppc, void (*cb)(void *data),
  1041. void *data)
  1042. {
  1043. struct writequeue_entry *e;
  1044. struct dlm_msg *msg;
  1045. msg = dlm_allocate_msg();
  1046. if (!msg)
  1047. return NULL;
  1048. kref_init(&msg->ref);
  1049. e = new_wq_entry(con, len, ppc, cb, data);
  1050. if (!e) {
  1051. dlm_free_msg(msg);
  1052. return NULL;
  1053. }
  1054. msg->retransmit = false;
  1055. msg->orig_msg = NULL;
  1056. msg->ppc = *ppc;
  1057. msg->len = len;
  1058. msg->entry = e;
  1059. return msg;
  1060. }
  1061. /* avoid false positive for nodes_srcu, unlock happens in
  1062. * dlm_lowcomms_commit_msg which is a must call if success
  1063. */
  1064. #ifndef __CHECKER__
  1065. struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
  1066. void (*cb)(void *data), void *data)
  1067. {
  1068. struct connection *con;
  1069. struct dlm_msg *msg;
  1070. int idx;
  1071. if (len > DLM_MAX_SOCKET_BUFSIZE ||
  1072. len < sizeof(struct dlm_header)) {
  1073. BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
  1074. log_print("failed to allocate a buffer of size %d", len);
  1075. WARN_ON_ONCE(1);
  1076. return NULL;
  1077. }
  1078. idx = srcu_read_lock(&connections_srcu);
  1079. con = nodeid2con(nodeid, 0);
  1080. if (WARN_ON_ONCE(!con)) {
  1081. srcu_read_unlock(&connections_srcu, idx);
  1082. return NULL;
  1083. }
  1084. msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data);
  1085. if (!msg) {
  1086. srcu_read_unlock(&connections_srcu, idx);
  1087. return NULL;
  1088. }
  1089. /* for dlm_lowcomms_commit_msg() */
  1090. kref_get(&msg->ref);
  1091. /* we assume if successful commit must called */
  1092. msg->idx = idx;
  1093. return msg;
  1094. }
  1095. #endif
  1096. static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
  1097. {
  1098. struct writequeue_entry *e = msg->entry;
  1099. struct connection *con = e->con;
  1100. int users;
  1101. spin_lock_bh(&con->writequeue_lock);
  1102. kref_get(&msg->ref);
  1103. list_add(&msg->list, &e->msgs);
  1104. users = --e->users;
  1105. if (users)
  1106. goto out;
  1107. e->len = DLM_WQ_LENGTH_BYTES(e);
  1108. lowcomms_queue_swork(con);
  1109. out:
  1110. spin_unlock_bh(&con->writequeue_lock);
  1111. return;
  1112. }
  1113. /* avoid false positive for nodes_srcu, lock was happen in
  1114. * dlm_lowcomms_new_msg
  1115. */
  1116. #ifndef __CHECKER__
  1117. void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
  1118. {
  1119. _dlm_lowcomms_commit_msg(msg);
  1120. srcu_read_unlock(&connections_srcu, msg->idx);
  1121. /* because dlm_lowcomms_new_msg() */
  1122. kref_put(&msg->ref, dlm_msg_release);
  1123. }
  1124. #endif
  1125. void dlm_lowcomms_put_msg(struct dlm_msg *msg)
  1126. {
  1127. kref_put(&msg->ref, dlm_msg_release);
  1128. }
  1129. /* does not held connections_srcu, usage lowcomms_error_report only */
  1130. int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
  1131. {
  1132. struct dlm_msg *msg_resend;
  1133. char *ppc;
  1134. if (msg->retransmit)
  1135. return 1;
  1136. msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc,
  1137. NULL, NULL);
  1138. if (!msg_resend)
  1139. return -ENOMEM;
  1140. msg->retransmit = true;
  1141. kref_get(&msg->ref);
  1142. msg_resend->orig_msg = msg;
  1143. memcpy(ppc, msg->ppc, msg->len);
  1144. _dlm_lowcomms_commit_msg(msg_resend);
  1145. dlm_lowcomms_put_msg(msg_resend);
  1146. return 0;
  1147. }
  1148. /* Send a message */
  1149. static int send_to_sock(struct connection *con)
  1150. {
  1151. struct writequeue_entry *e;
  1152. struct bio_vec bvec;
  1153. struct msghdr msg = {
  1154. .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
  1155. };
  1156. int len, offset, ret;
  1157. spin_lock_bh(&con->writequeue_lock);
  1158. e = con_next_wq(con);
  1159. if (!e) {
  1160. clear_bit(CF_SEND_PENDING, &con->flags);
  1161. spin_unlock_bh(&con->writequeue_lock);
  1162. return DLM_IO_END;
  1163. }
  1164. len = e->len;
  1165. offset = e->offset;
  1166. WARN_ON_ONCE(len == 0 && e->users == 0);
  1167. spin_unlock_bh(&con->writequeue_lock);
  1168. bvec_set_page(&bvec, e->page, len, offset);
  1169. iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len);
  1170. ret = sock_sendmsg(con->sock, &msg);
  1171. trace_dlm_send(con->nodeid, ret);
  1172. if (ret == -EAGAIN || ret == 0) {
  1173. lock_sock(con->sock->sk);
  1174. spin_lock_bh(&con->writequeue_lock);
  1175. if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
  1176. !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
  1177. /* Notify TCP that we're limited by the
  1178. * application window size.
  1179. */
  1180. set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
  1181. con->sock->sk->sk_write_pending++;
  1182. clear_bit(CF_SEND_PENDING, &con->flags);
  1183. spin_unlock_bh(&con->writequeue_lock);
  1184. release_sock(con->sock->sk);
  1185. /* wait for write_space() event */
  1186. return DLM_IO_END;
  1187. }
  1188. spin_unlock_bh(&con->writequeue_lock);
  1189. release_sock(con->sock->sk);
  1190. return DLM_IO_RESCHED;
  1191. } else if (ret < 0) {
  1192. return ret;
  1193. }
  1194. spin_lock_bh(&con->writequeue_lock);
  1195. writequeue_entry_complete(e, ret);
  1196. spin_unlock_bh(&con->writequeue_lock);
  1197. return DLM_IO_SUCCESS;
  1198. }
  1199. static void clean_one_writequeue(struct connection *con)
  1200. {
  1201. struct writequeue_entry *e, *safe;
  1202. spin_lock_bh(&con->writequeue_lock);
  1203. list_for_each_entry_safe(e, safe, &con->writequeue, list) {
  1204. free_entry(e);
  1205. }
  1206. spin_unlock_bh(&con->writequeue_lock);
  1207. }
  1208. static void connection_release(struct rcu_head *rcu)
  1209. {
  1210. struct connection *con = container_of(rcu, struct connection, rcu);
  1211. WARN_ON_ONCE(!list_empty(&con->writequeue));
  1212. WARN_ON_ONCE(con->sock);
  1213. kfree(con);
  1214. }
  1215. /* Called from recovery when it knows that a node has
  1216. left the cluster */
  1217. int dlm_lowcomms_close(int nodeid)
  1218. {
  1219. struct connection *con;
  1220. int idx;
  1221. log_print("closing connection to node %d", nodeid);
  1222. idx = srcu_read_lock(&connections_srcu);
  1223. con = nodeid2con(nodeid, 0);
  1224. if (WARN_ON_ONCE(!con)) {
  1225. srcu_read_unlock(&connections_srcu, idx);
  1226. return -ENOENT;
  1227. }
  1228. stop_connection_io(con);
  1229. log_print("io handling for node: %d stopped", nodeid);
  1230. close_connection(con, true);
  1231. spin_lock(&connections_lock);
  1232. hlist_del_rcu(&con->list);
  1233. spin_unlock(&connections_lock);
  1234. clean_one_writequeue(con);
  1235. call_srcu(&connections_srcu, &con->rcu, connection_release);
  1236. if (con->othercon) {
  1237. clean_one_writequeue(con->othercon);
  1238. call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
  1239. }
  1240. srcu_read_unlock(&connections_srcu, idx);
  1241. /* for debugging we print when we are done to compare with other
  1242. * messages in between. This function need to be correctly synchronized
  1243. * with io handling
  1244. */
  1245. log_print("closing connection to node %d done", nodeid);
  1246. return 0;
  1247. }
  1248. /* Receive worker function */
  1249. static void process_recv_sockets(struct work_struct *work)
  1250. {
  1251. struct connection *con = container_of(work, struct connection, rwork);
  1252. int ret, buflen;
  1253. down_read(&con->sock_lock);
  1254. if (!con->sock) {
  1255. up_read(&con->sock_lock);
  1256. return;
  1257. }
  1258. buflen = READ_ONCE(dlm_config.ci_buffer_size);
  1259. do {
  1260. ret = receive_from_sock(con, buflen);
  1261. } while (ret == DLM_IO_SUCCESS);
  1262. up_read(&con->sock_lock);
  1263. switch (ret) {
  1264. case DLM_IO_END:
  1265. /* CF_RECV_PENDING cleared */
  1266. break;
  1267. case DLM_IO_EOF:
  1268. close_connection(con, false);
  1269. wake_up(&con->shutdown_wait);
  1270. /* CF_RECV_PENDING cleared */
  1271. break;
  1272. case DLM_IO_FLUSH:
  1273. /* we can't flush the process_workqueue here because a
  1274. * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
  1275. * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
  1276. * we have a waitqueue to wait until all messages are
  1277. * processed.
  1278. *
  1279. * This handling is only necessary to backoff the sender and
  1280. * not queue all messages from the socket layer into DLM
  1281. * processqueue. When DLM is capable to parse multiple messages
  1282. * on an e.g. per socket basis this handling can might be
  1283. * removed. Especially in a message burst we are too slow to
  1284. * process messages and the queue will fill up memory.
  1285. */
  1286. wait_event(processqueue_wq, !atomic_read(&processqueue_count));
  1287. fallthrough;
  1288. case DLM_IO_RESCHED:
  1289. cond_resched();
  1290. queue_work(io_workqueue, &con->rwork);
  1291. /* CF_RECV_PENDING not cleared */
  1292. break;
  1293. default:
  1294. if (ret < 0) {
  1295. if (test_bit(CF_IS_OTHERCON, &con->flags)) {
  1296. close_connection(con, false);
  1297. } else {
  1298. spin_lock_bh(&con->writequeue_lock);
  1299. lowcomms_queue_swork(con);
  1300. spin_unlock_bh(&con->writequeue_lock);
  1301. }
  1302. /* CF_RECV_PENDING cleared for othercon
  1303. * we trigger send queue if not already done
  1304. * and process_send_sockets will handle it
  1305. */
  1306. break;
  1307. }
  1308. WARN_ON_ONCE(1);
  1309. break;
  1310. }
  1311. }
  1312. static void process_listen_recv_socket(struct work_struct *work)
  1313. {
  1314. int ret;
  1315. if (WARN_ON_ONCE(!listen_con.sock))
  1316. return;
  1317. do {
  1318. ret = accept_from_sock();
  1319. } while (ret == DLM_IO_SUCCESS);
  1320. if (ret < 0)
  1321. log_print("critical error accepting connection: %d", ret);
  1322. }
  1323. static int dlm_connect(struct connection *con)
  1324. {
  1325. struct sockaddr_storage addr;
  1326. int result, addr_len;
  1327. struct socket *sock;
  1328. unsigned int mark;
  1329. memset(&addr, 0, sizeof(addr));
  1330. result = nodeid_to_addr(con->nodeid, &addr, NULL,
  1331. dlm_proto_ops->try_new_addr, &mark);
  1332. if (result < 0) {
  1333. log_print("no address for nodeid %d", con->nodeid);
  1334. return result;
  1335. }
  1336. /* Create a socket to communicate with */
  1337. result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
  1338. SOCK_STREAM, dlm_proto_ops->proto, &sock);
  1339. if (result < 0)
  1340. return result;
  1341. sock_set_mark(sock->sk, mark);
  1342. dlm_proto_ops->sockopts(sock);
  1343. result = dlm_proto_ops->bind(sock);
  1344. if (result < 0) {
  1345. sock_release(sock);
  1346. return result;
  1347. }
  1348. add_sock(sock, con);
  1349. log_print_ratelimited("connecting to %d", con->nodeid);
  1350. make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
  1351. result = kernel_connect(sock, (struct sockaddr_unsized *)&addr, addr_len, 0);
  1352. switch (result) {
  1353. case -EINPROGRESS:
  1354. /* not an error */
  1355. fallthrough;
  1356. case 0:
  1357. break;
  1358. default:
  1359. if (result < 0)
  1360. dlm_close_sock(&con->sock);
  1361. break;
  1362. }
  1363. return result;
  1364. }
  1365. /* Send worker function */
  1366. static void process_send_sockets(struct work_struct *work)
  1367. {
  1368. struct connection *con = container_of(work, struct connection, swork);
  1369. int ret;
  1370. WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
  1371. down_read(&con->sock_lock);
  1372. if (!con->sock) {
  1373. up_read(&con->sock_lock);
  1374. down_write(&con->sock_lock);
  1375. if (!con->sock) {
  1376. ret = dlm_connect(con);
  1377. switch (ret) {
  1378. case 0:
  1379. break;
  1380. default:
  1381. /* CF_SEND_PENDING not cleared */
  1382. up_write(&con->sock_lock);
  1383. log_print("connect to node %d try %d error %d",
  1384. con->nodeid, con->retries++, ret);
  1385. msleep(1000);
  1386. /* For now we try forever to reconnect. In
  1387. * future we should send a event to cluster
  1388. * manager to fence itself after certain amount
  1389. * of retries.
  1390. */
  1391. queue_work(io_workqueue, &con->swork);
  1392. return;
  1393. }
  1394. }
  1395. downgrade_write(&con->sock_lock);
  1396. }
  1397. do {
  1398. ret = send_to_sock(con);
  1399. } while (ret == DLM_IO_SUCCESS);
  1400. up_read(&con->sock_lock);
  1401. switch (ret) {
  1402. case DLM_IO_END:
  1403. /* CF_SEND_PENDING cleared */
  1404. break;
  1405. case DLM_IO_RESCHED:
  1406. /* CF_SEND_PENDING not cleared */
  1407. cond_resched();
  1408. queue_work(io_workqueue, &con->swork);
  1409. break;
  1410. default:
  1411. if (ret < 0) {
  1412. close_connection(con, false);
  1413. /* CF_SEND_PENDING cleared */
  1414. spin_lock_bh(&con->writequeue_lock);
  1415. lowcomms_queue_swork(con);
  1416. spin_unlock_bh(&con->writequeue_lock);
  1417. break;
  1418. }
  1419. WARN_ON_ONCE(1);
  1420. break;
  1421. }
  1422. }
  1423. static void work_stop(void)
  1424. {
  1425. if (io_workqueue) {
  1426. destroy_workqueue(io_workqueue);
  1427. io_workqueue = NULL;
  1428. }
  1429. if (process_workqueue) {
  1430. destroy_workqueue(process_workqueue);
  1431. process_workqueue = NULL;
  1432. }
  1433. }
  1434. static int work_start(void)
  1435. {
  1436. io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM |
  1437. WQ_UNBOUND, 0);
  1438. if (!io_workqueue) {
  1439. log_print("can't start dlm_io");
  1440. return -ENOMEM;
  1441. }
  1442. process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH | WQ_PERCPU, 0);
  1443. if (!process_workqueue) {
  1444. log_print("can't start dlm_process");
  1445. destroy_workqueue(io_workqueue);
  1446. io_workqueue = NULL;
  1447. return -ENOMEM;
  1448. }
  1449. return 0;
  1450. }
  1451. void dlm_lowcomms_shutdown(void)
  1452. {
  1453. struct connection *con;
  1454. int i, idx;
  1455. /* stop lowcomms_listen_data_ready calls */
  1456. lock_sock(listen_con.sock->sk);
  1457. listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
  1458. release_sock(listen_con.sock->sk);
  1459. cancel_work_sync(&listen_con.rwork);
  1460. dlm_close_sock(&listen_con.sock);
  1461. idx = srcu_read_lock(&connections_srcu);
  1462. for (i = 0; i < CONN_HASH_SIZE; i++) {
  1463. hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
  1464. shutdown_connection(con, true);
  1465. stop_connection_io(con);
  1466. flush_workqueue(process_workqueue);
  1467. close_connection(con, true);
  1468. clean_one_writequeue(con);
  1469. if (con->othercon)
  1470. clean_one_writequeue(con->othercon);
  1471. allow_connection_io(con);
  1472. }
  1473. }
  1474. srcu_read_unlock(&connections_srcu, idx);
  1475. }
  1476. void dlm_lowcomms_stop(void)
  1477. {
  1478. work_stop();
  1479. dlm_proto_ops = NULL;
  1480. }
  1481. static int dlm_listen_for_all(void)
  1482. {
  1483. struct socket *sock;
  1484. int result;
  1485. log_print("Using %s for communications",
  1486. dlm_proto_ops->name);
  1487. result = dlm_proto_ops->listen_validate();
  1488. if (result < 0)
  1489. return result;
  1490. result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
  1491. SOCK_STREAM, dlm_proto_ops->proto, &sock);
  1492. if (result < 0) {
  1493. log_print("Can't create comms socket: %d", result);
  1494. return result;
  1495. }
  1496. sock_set_mark(sock->sk, dlm_config.ci_mark);
  1497. dlm_proto_ops->listen_sockopts(sock);
  1498. result = dlm_proto_ops->listen_bind(sock);
  1499. if (result < 0)
  1500. goto out;
  1501. lock_sock(sock->sk);
  1502. listen_sock.sk_data_ready = sock->sk->sk_data_ready;
  1503. listen_sock.sk_write_space = sock->sk->sk_write_space;
  1504. listen_sock.sk_error_report = sock->sk->sk_error_report;
  1505. listen_sock.sk_state_change = sock->sk->sk_state_change;
  1506. listen_con.sock = sock;
  1507. sock->sk->sk_allocation = GFP_NOFS;
  1508. sock->sk->sk_use_task_frag = false;
  1509. sock->sk->sk_data_ready = lowcomms_listen_data_ready;
  1510. release_sock(sock->sk);
  1511. result = sock->ops->listen(sock, 128);
  1512. if (result < 0) {
  1513. dlm_close_sock(&listen_con.sock);
  1514. return result;
  1515. }
  1516. return 0;
  1517. out:
  1518. sock_release(sock);
  1519. return result;
  1520. }
  1521. static int dlm_tcp_bind(struct socket *sock)
  1522. {
  1523. struct sockaddr_storage src_addr;
  1524. int result, addr_len;
  1525. /* Bind to our cluster-known address connecting to avoid
  1526. * routing problems.
  1527. */
  1528. memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
  1529. make_sockaddr(&src_addr, 0, &addr_len);
  1530. result = kernel_bind(sock, (struct sockaddr_unsized *)&src_addr,
  1531. addr_len);
  1532. if (result < 0) {
  1533. /* This *may* not indicate a critical error */
  1534. log_print("could not bind for connect: %d", result);
  1535. }
  1536. return 0;
  1537. }
  1538. static int dlm_tcp_listen_validate(void)
  1539. {
  1540. /* We don't support multi-homed hosts */
  1541. if (dlm_local_count > 1) {
  1542. log_print("Detect multi-homed hosts but use only the first IP address.");
  1543. log_print("Try SCTP, if you want to enable multi-link.");
  1544. }
  1545. return 0;
  1546. }
  1547. static void dlm_tcp_sockopts(struct socket *sock)
  1548. {
  1549. /* Turn off Nagle's algorithm */
  1550. tcp_sock_set_nodelay(sock->sk);
  1551. }
  1552. static void dlm_tcp_listen_sockopts(struct socket *sock)
  1553. {
  1554. dlm_tcp_sockopts(sock);
  1555. sock_set_reuseaddr(sock->sk);
  1556. }
  1557. static int dlm_tcp_listen_bind(struct socket *sock)
  1558. {
  1559. int addr_len;
  1560. /* Bind to our port */
  1561. make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
  1562. return kernel_bind(sock, (struct sockaddr_unsized *)&dlm_local_addr[0],
  1563. addr_len);
  1564. }
  1565. static const struct dlm_proto_ops dlm_tcp_ops = {
  1566. .name = "TCP",
  1567. .proto = IPPROTO_TCP,
  1568. .how = SHUT_WR,
  1569. .sockopts = dlm_tcp_sockopts,
  1570. .bind = dlm_tcp_bind,
  1571. .listen_validate = dlm_tcp_listen_validate,
  1572. .listen_sockopts = dlm_tcp_listen_sockopts,
  1573. .listen_bind = dlm_tcp_listen_bind,
  1574. };
  1575. static int dlm_sctp_bind(struct socket *sock)
  1576. {
  1577. return sctp_bind_addrs(sock, 0);
  1578. }
  1579. static int dlm_sctp_listen_validate(void)
  1580. {
  1581. if (!IS_ENABLED(CONFIG_IP_SCTP)) {
  1582. log_print("SCTP is not enabled by this kernel");
  1583. return -EOPNOTSUPP;
  1584. }
  1585. request_module("sctp");
  1586. return 0;
  1587. }
  1588. static int dlm_sctp_bind_listen(struct socket *sock)
  1589. {
  1590. return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
  1591. }
  1592. static void dlm_sctp_sockopts(struct socket *sock)
  1593. {
  1594. /* Turn off Nagle's algorithm */
  1595. sctp_sock_set_nodelay(sock->sk);
  1596. sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
  1597. }
  1598. static const struct dlm_proto_ops dlm_sctp_ops = {
  1599. .name = "SCTP",
  1600. .proto = IPPROTO_SCTP,
  1601. .how = SHUT_RDWR,
  1602. .try_new_addr = true,
  1603. .sockopts = dlm_sctp_sockopts,
  1604. .bind = dlm_sctp_bind,
  1605. .listen_validate = dlm_sctp_listen_validate,
  1606. .listen_sockopts = dlm_sctp_sockopts,
  1607. .listen_bind = dlm_sctp_bind_listen,
  1608. };
  1609. int dlm_lowcomms_start(void)
  1610. {
  1611. int error;
  1612. init_local();
  1613. if (!dlm_local_count) {
  1614. error = -ENOTCONN;
  1615. log_print("no local IP address has been set");
  1616. goto fail;
  1617. }
  1618. error = work_start();
  1619. if (error)
  1620. goto fail;
  1621. /* Start listening */
  1622. switch (dlm_config.ci_protocol) {
  1623. case DLM_PROTO_TCP:
  1624. dlm_proto_ops = &dlm_tcp_ops;
  1625. break;
  1626. case DLM_PROTO_SCTP:
  1627. dlm_proto_ops = &dlm_sctp_ops;
  1628. break;
  1629. default:
  1630. log_print("Invalid protocol identifier %d set",
  1631. dlm_config.ci_protocol);
  1632. error = -EINVAL;
  1633. goto fail_proto_ops;
  1634. }
  1635. error = dlm_listen_for_all();
  1636. if (error)
  1637. goto fail_listen;
  1638. return 0;
  1639. fail_listen:
  1640. dlm_proto_ops = NULL;
  1641. fail_proto_ops:
  1642. work_stop();
  1643. fail:
  1644. return error;
  1645. }
  1646. void dlm_lowcomms_init(void)
  1647. {
  1648. int i;
  1649. for (i = 0; i < CONN_HASH_SIZE; i++)
  1650. INIT_HLIST_HEAD(&connection_hash[i]);
  1651. INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
  1652. }
  1653. void dlm_lowcomms_exit(void)
  1654. {
  1655. struct connection *con;
  1656. int i, idx;
  1657. idx = srcu_read_lock(&connections_srcu);
  1658. for (i = 0; i < CONN_HASH_SIZE; i++) {
  1659. hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
  1660. spin_lock(&connections_lock);
  1661. hlist_del_rcu(&con->list);
  1662. spin_unlock(&connections_lock);
  1663. if (con->othercon)
  1664. call_srcu(&connections_srcu, &con->othercon->rcu,
  1665. connection_release);
  1666. call_srcu(&connections_srcu, &con->rcu, connection_release);
  1667. }
  1668. }
  1669. srcu_read_unlock(&connections_srcu, idx);
  1670. }