data-vio.c 66 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056
  1. // SPDX-License-Identifier: GPL-2.0-only
  2. /*
  3. * Copyright 2023 Red Hat
  4. */
  5. #include "data-vio.h"
  6. #include <linux/atomic.h>
  7. #include <linux/bio.h>
  8. #include <linux/blkdev.h>
  9. #include <linux/delay.h>
  10. #include <linux/device-mapper.h>
  11. #include <linux/jiffies.h>
  12. #include <linux/kernel.h>
  13. #include <linux/list.h>
  14. #include <linux/lz4.h>
  15. #include <linux/minmax.h>
  16. #include <linux/sched.h>
  17. #include <linux/spinlock.h>
  18. #include <linux/string.h>
  19. #include <linux/wait.h>
  20. #include "logger.h"
  21. #include "memory-alloc.h"
  22. #include "murmurhash3.h"
  23. #include "permassert.h"
  24. #include "block-map.h"
  25. #include "dump.h"
  26. #include "encodings.h"
  27. #include "int-map.h"
  28. #include "io-submitter.h"
  29. #include "logical-zone.h"
  30. #include "packer.h"
  31. #include "recovery-journal.h"
  32. #include "slab-depot.h"
  33. #include "status-codes.h"
  34. #include "types.h"
  35. #include "vdo.h"
  36. #include "vio.h"
  37. #include "wait-queue.h"
  38. /**
  39. * DOC: Bio flags.
  40. *
  41. * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those
  42. * flags on our own bio(s) for that request may help underlying layers better fulfill the user
  43. * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other
  44. * flags, as they convey incorrect information.
  45. *
  46. * These flags are always irrelevant if we have already finished the user bio as they are only
  47. * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how
  48. * important finishing the finished bio was.
  49. *
  50. * Note that bio.c contains the complete list of flags we believe may be set; the following list
  51. * explains the action taken with each of those flags VDO could receive:
  52. *
  53. * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio
  54. * completion is required for further work to be done by the issuer.
  55. * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer
  56. * treats it as more urgent, similar to REQ_SYNC.
  57. * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is
  58. * important.
  59. * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO.
  60. * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't
  61. * match incoming IO, so this flag is incorrect for it.
  62. * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise.
  63. * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance.
  64. * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled
  65. * ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load
  66. * prioritization.
  67. */
  68. static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD);
  69. /**
  70. * DOC:
  71. *
  72. * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For
  73. * correctness, and in order to avoid potentially expensive or blocking memory allocations during
  74. * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order
  75. * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for
  76. * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios
  77. * for which a data_vio or discard permit are not available will block until the necessary
  78. * resources are available. The pool is also responsible for distributing resources to blocked
  79. * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by
  80. * performing the work of actually assigning resources to blocked threads or placing data_vios back
  81. * into the pool on a single cpu at a time.
  82. *
  83. * The pool contains two "limiters", one for tracking data_vios and one for tracking discard
  84. * permits. The limiters also provide safe cross-thread access to pool statistics without the need
  85. * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to
  86. * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources
  87. * are available, the incoming bio will be assigned to the acquired data_vio, and it will be
  88. * launched. However, if either of these are unavailable, the arrival time of the bio is recorded
  89. * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate
  90. * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will
  91. * break if jiffies are only 32 bits.)
  92. *
  93. * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio()
  94. * will be called on it. This function will add the data_vio to a funnel queue, and then check the
  95. * state of the pool. If the pool is not currently processing released data_vios, the pool's
  96. * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to
  97. * hold the pool's lock, and also batches release work while avoiding starvation of the cpu
  98. * threads.
  99. *
  100. * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which
  101. * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For
  102. * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there
  103. * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the
  104. * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting
  105. * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool.
  106. * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or
  107. * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the
  108. * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit
  109. * them are awakened.
  110. */
  111. #define DATA_VIO_RELEASE_BATCH_SIZE 128
  112. static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1;
  113. static const u32 COMPRESSION_STATUS_MASK = 0xff;
  114. static const u32 MAY_NOT_COMPRESS_MASK = 0x80000000;
  115. struct limiter;
  116. typedef void (*assigner_fn)(struct limiter *limiter);
  117. /* Bookkeeping structure for a single type of resource. */
  118. struct limiter {
  119. /* The data_vio_pool to which this limiter belongs */
  120. struct data_vio_pool *pool;
  121. /* The maximum number of data_vios available */
  122. data_vio_count_t limit;
  123. /* The number of resources in use */
  124. data_vio_count_t busy;
  125. /* The maximum number of resources ever simultaneously in use */
  126. data_vio_count_t max_busy;
  127. /* The number of resources to release */
  128. data_vio_count_t release_count;
  129. /* The number of waiters to wake */
  130. data_vio_count_t wake_count;
  131. /* The list of waiting bios which are known to process_release_callback() */
  132. struct bio_list waiters;
  133. /* The list of waiting bios which are not yet known to process_release_callback() */
  134. struct bio_list new_waiters;
  135. /* The list of waiters which have their permits */
  136. struct bio_list *permitted_waiters;
  137. /* The function for assigning a resource to a waiter */
  138. assigner_fn assigner;
  139. /* The queue of blocked threads */
  140. wait_queue_head_t blocked_threads;
  141. /* The arrival time of the eldest waiter */
  142. u64 arrival;
  143. };
  144. /*
  145. * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread,
  146. * and are released in batches.
  147. */
  148. struct data_vio_pool {
  149. /* Completion for scheduling releases */
  150. struct vdo_completion completion;
  151. /* The administrative state of the pool */
  152. struct admin_state state;
  153. /* Lock protecting the pool */
  154. spinlock_t lock;
  155. /* The main limiter controlling the total data_vios in the pool. */
  156. struct limiter limiter;
  157. /* The limiter controlling data_vios for discard */
  158. struct limiter discard_limiter;
  159. /* The list of bios which have discard permits but still need a data_vio */
  160. struct bio_list permitted_discards;
  161. /* The list of available data_vios */
  162. struct list_head available;
  163. /* The queue of data_vios waiting to be returned to the pool */
  164. struct funnel_queue *queue;
  165. /* Whether the pool is processing, or scheduled to process releases */
  166. atomic_t processing;
  167. /* The data vios in the pool */
  168. struct data_vio data_vios[];
  169. };
  170. static const char * const ASYNC_OPERATION_NAMES[] = {
  171. "launch",
  172. "acknowledge_write",
  173. "acquire_hash_lock",
  174. "attempt_logical_block_lock",
  175. "lock_duplicate_pbn",
  176. "check_for_duplication",
  177. "cleanup",
  178. "compress_data_vio",
  179. "find_block_map_slot",
  180. "get_mapped_block_for_read",
  181. "get_mapped_block_for_write",
  182. "hash_data_vio",
  183. "journal_remapping",
  184. "vdo_attempt_packing",
  185. "put_mapped_block",
  186. "read_data_vio",
  187. "update_dedupe_index",
  188. "update_reference_counts",
  189. "verify_duplication",
  190. "write_data_vio",
  191. };
  192. /* The steps taken cleaning up a VIO, in the order they are performed. */
  193. enum data_vio_cleanup_stage {
  194. VIO_CLEANUP_START,
  195. VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START,
  196. VIO_RELEASE_ALLOCATED,
  197. VIO_RELEASE_RECOVERY_LOCKS,
  198. VIO_RELEASE_LOGICAL,
  199. VIO_CLEANUP_DONE
  200. };
  201. static inline struct data_vio_pool * __must_check
  202. as_data_vio_pool(struct vdo_completion *completion)
  203. {
  204. vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION);
  205. return container_of(completion, struct data_vio_pool, completion);
  206. }
  207. static inline u64 get_arrival_time(struct bio *bio)
  208. {
  209. return (u64) bio->bi_private;
  210. }
  211. /**
  212. * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios
  213. * or waiters while holding the pool's lock.
  214. * @pool: The data_vio pool.
  215. */
  216. static bool check_for_drain_complete_locked(struct data_vio_pool *pool)
  217. {
  218. if (pool->limiter.busy > 0)
  219. return false;
  220. VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0),
  221. "no outstanding discard permits");
  222. return (bio_list_empty(&pool->limiter.new_waiters) &&
  223. bio_list_empty(&pool->discard_limiter.new_waiters));
  224. }
  225. static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn)
  226. {
  227. struct vdo *vdo = vdo_from_data_vio(data_vio);
  228. zone_count_t zone_number;
  229. struct lbn_lock *lock = &data_vio->logical;
  230. lock->lbn = lbn;
  231. lock->locked = false;
  232. vdo_waitq_init(&lock->waiters);
  233. zone_number = vdo_compute_logical_zone(data_vio);
  234. lock->zone = &vdo->logical_zones->zones[zone_number];
  235. }
  236. static void launch_locked_request(struct data_vio *data_vio)
  237. {
  238. data_vio->logical.locked = true;
  239. if (data_vio->write) {
  240. struct vdo *vdo = vdo_from_data_vio(data_vio);
  241. if (vdo_is_read_only(vdo)) {
  242. continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
  243. return;
  244. }
  245. }
  246. data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT;
  247. vdo_find_block_map_slot(data_vio);
  248. }
  249. static void acknowledge_data_vio(struct data_vio *data_vio)
  250. {
  251. struct vdo *vdo = vdo_from_data_vio(data_vio);
  252. struct bio *bio = data_vio->user_bio;
  253. int error = vdo_status_to_errno(data_vio->vio.completion.result);
  254. if (bio == NULL)
  255. return;
  256. VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <=
  257. (u32) (VDO_BLOCK_SIZE - data_vio->offset)),
  258. "data_vio to acknowledge is not an incomplete discard");
  259. data_vio->user_bio = NULL;
  260. vdo_count_bios(&vdo->stats.bios_acknowledged, bio);
  261. if (data_vio->is_partial)
  262. vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio);
  263. bio->bi_status = errno_to_blk_status(error);
  264. bio_endio(bio);
  265. }
  266. static void copy_to_bio(struct bio *bio, char *data_ptr)
  267. {
  268. struct bio_vec biovec;
  269. struct bvec_iter iter;
  270. bio_for_each_segment(biovec, bio, iter) {
  271. memcpy_to_bvec(&biovec, data_ptr);
  272. data_ptr += biovec.bv_len;
  273. }
  274. }
  275. struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio)
  276. {
  277. u32 packed = atomic_read(&data_vio->compression.status);
  278. /* pairs with cmpxchg in set_data_vio_compression_status */
  279. smp_rmb();
  280. return (struct data_vio_compression_status) {
  281. .stage = packed & COMPRESSION_STATUS_MASK,
  282. .may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0),
  283. };
  284. }
  285. /**
  286. * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored
  287. * atomically.
  288. * @status: The state to convert.
  289. *
  290. * Return: The compression state packed into a u32.
  291. */
  292. static u32 __must_check pack_status(struct data_vio_compression_status status)
  293. {
  294. return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0);
  295. }
  296. /**
  297. * set_data_vio_compression_status() - Set the compression status of a data_vio.
  298. * @data_vio: The data_vio to change.
  299. * @status: The expected current status of the data_vio.
  300. * @new_status: The status to set.
  301. *
  302. * Return: true if the new status was set, false if the data_vio's compression status did not
  303. * match the expected state, and so was left unchanged.
  304. */
  305. static bool __must_check
  306. set_data_vio_compression_status(struct data_vio *data_vio,
  307. struct data_vio_compression_status status,
  308. struct data_vio_compression_status new_status)
  309. {
  310. u32 actual;
  311. u32 expected = pack_status(status);
  312. u32 replacement = pack_status(new_status);
  313. /*
  314. * Extra barriers because this was original developed using a CAS operation that implicitly
  315. * had them.
  316. */
  317. smp_mb__before_atomic();
  318. actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement);
  319. /* same as before_atomic */
  320. smp_mb__after_atomic();
  321. return (expected == actual);
  322. }
  323. struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio)
  324. {
  325. for (;;) {
  326. struct data_vio_compression_status status =
  327. get_data_vio_compression_status(data_vio);
  328. struct data_vio_compression_status new_status = status;
  329. if (status.stage == DATA_VIO_POST_PACKER) {
  330. /* We're already in the last stage. */
  331. return status;
  332. }
  333. if (status.may_not_compress) {
  334. /*
  335. * Compression has been dis-allowed for this VIO, so skip the rest of the
  336. * path and go to the end.
  337. */
  338. new_status.stage = DATA_VIO_POST_PACKER;
  339. } else {
  340. /* Go to the next state. */
  341. new_status.stage++;
  342. }
  343. if (set_data_vio_compression_status(data_vio, status, new_status))
  344. return new_status;
  345. /* Another thread changed the status out from under us so try again. */
  346. }
  347. }
  348. /**
  349. * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed.
  350. * @data_vio: The data_vio.
  351. *
  352. * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it.
  353. */
  354. bool cancel_data_vio_compression(struct data_vio *data_vio)
  355. {
  356. struct data_vio_compression_status status, new_status;
  357. for (;;) {
  358. status = get_data_vio_compression_status(data_vio);
  359. if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) {
  360. /* This data_vio is already set up to not block in the packer. */
  361. break;
  362. }
  363. new_status.stage = status.stage;
  364. new_status.may_not_compress = true;
  365. if (set_data_vio_compression_status(data_vio, status, new_status))
  366. break;
  367. }
  368. return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress);
  369. }
  370. /**
  371. * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block.
  372. * @completion: The data_vio for an external data request as a completion.
  373. *
  374. * This is the start of the path for all external requests. It is registered in launch_data_vio().
  375. */
  376. static void attempt_logical_block_lock(struct vdo_completion *completion)
  377. {
  378. struct data_vio *data_vio = as_data_vio(completion);
  379. struct lbn_lock *lock = &data_vio->logical;
  380. struct vdo *vdo = vdo_from_data_vio(data_vio);
  381. struct data_vio *lock_holder;
  382. int result;
  383. assert_data_vio_in_logical_zone(data_vio);
  384. if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) {
  385. continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE);
  386. return;
  387. }
  388. result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
  389. data_vio, false, (void **) &lock_holder);
  390. if (result != VDO_SUCCESS) {
  391. continue_data_vio_with_error(data_vio, result);
  392. return;
  393. }
  394. if (lock_holder == NULL) {
  395. /* We got the lock */
  396. launch_locked_request(data_vio);
  397. return;
  398. }
  399. result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held");
  400. if (result != VDO_SUCCESS) {
  401. continue_data_vio_with_error(data_vio, result);
  402. return;
  403. }
  404. /*
  405. * If the new request is a pure read request (not read-modify-write) and the lock_holder is
  406. * writing and has received an allocation, service the read request immediately by copying
  407. * data from the lock_holder to avoid having to flush the write out of the packer just to
  408. * prevent the read from waiting indefinitely. If the lock_holder does not yet have an
  409. * allocation, prevent it from blocking in the packer and wait on it. This is necessary in
  410. * order to prevent returning data that may not have actually been written.
  411. */
  412. if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) {
  413. copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset);
  414. acknowledge_data_vio(data_vio);
  415. complete_data_vio(completion);
  416. return;
  417. }
  418. data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK;
  419. vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter);
  420. /*
  421. * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the
  422. * packer.
  423. */
  424. if (lock_holder->write && cancel_data_vio_compression(lock_holder)) {
  425. data_vio->compression.lock_holder = lock_holder;
  426. launch_data_vio_packer_callback(data_vio,
  427. vdo_remove_lock_holder_from_packer);
  428. }
  429. }
  430. /**
  431. * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the
  432. * same parent and other state and send it on its way.
  433. * @data_vio: The data_vio to launch.
  434. * @lbn: The logical block number.
  435. */
  436. static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn)
  437. {
  438. struct vdo_completion *completion = &data_vio->vio.completion;
  439. /*
  440. * Clearing the tree lock must happen before initializing the LBN lock, which also adds
  441. * information to the tree lock.
  442. */
  443. memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock));
  444. initialize_lbn_lock(data_vio, lbn);
  445. INIT_LIST_HEAD(&data_vio->hash_lock_entry);
  446. INIT_LIST_HEAD(&data_vio->write_entry);
  447. memset(&data_vio->allocation, 0, sizeof(data_vio->allocation));
  448. data_vio->is_duplicate = false;
  449. memset(&data_vio->record_name, 0, sizeof(data_vio->record_name));
  450. memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate));
  451. vdo_reset_completion(&data_vio->decrement_completion);
  452. vdo_reset_completion(completion);
  453. completion->error_handler = handle_data_vio_error;
  454. set_data_vio_logical_callback(data_vio, attempt_logical_block_lock);
  455. vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY);
  456. }
  457. static void copy_from_bio(struct bio *bio, char *data_ptr)
  458. {
  459. struct bio_vec biovec;
  460. struct bvec_iter iter;
  461. bio_for_each_segment(biovec, bio, iter) {
  462. memcpy_from_bvec(data_ptr, &biovec);
  463. data_ptr += biovec.bv_len;
  464. }
  465. }
  466. static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio)
  467. {
  468. logical_block_number_t lbn;
  469. /*
  470. * Zero out the fields which don't need to be preserved (i.e. which are not pointers to
  471. * separately allocated objects).
  472. */
  473. memset(data_vio, 0, offsetof(struct data_vio, vio));
  474. memset(&data_vio->compression, 0, offsetof(struct compression_state, block));
  475. data_vio->user_bio = bio;
  476. data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK);
  477. data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0);
  478. /*
  479. * Discards behave very differently than other requests when coming in from device-mapper.
  480. * We have to be able to handle any size discards and various sector offsets within a
  481. * block.
  482. */
  483. if (bio_op(bio) == REQ_OP_DISCARD) {
  484. data_vio->remaining_discard = bio->bi_iter.bi_size;
  485. data_vio->write = true;
  486. data_vio->is_discard = true;
  487. if (data_vio->is_partial) {
  488. vdo_count_bios(&vdo->stats.bios_in_partial, bio);
  489. data_vio->read = true;
  490. }
  491. } else if (data_vio->is_partial) {
  492. vdo_count_bios(&vdo->stats.bios_in_partial, bio);
  493. data_vio->read = true;
  494. if (bio_data_dir(bio) == WRITE)
  495. data_vio->write = true;
  496. } else if (bio_data_dir(bio) == READ) {
  497. data_vio->read = true;
  498. } else {
  499. /*
  500. * Copy the bio data to a char array so that we can continue to use the data after
  501. * we acknowledge the bio.
  502. */
  503. copy_from_bio(bio, data_vio->vio.data);
  504. data_vio->is_zero = mem_is_zero(data_vio->vio.data, VDO_BLOCK_SIZE);
  505. data_vio->write = true;
  506. }
  507. if (data_vio->user_bio->bi_opf & REQ_FUA)
  508. data_vio->fua = true;
  509. lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK;
  510. launch_data_vio(data_vio, lbn);
  511. }
  512. static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio)
  513. {
  514. struct bio *bio = bio_list_pop(limiter->permitted_waiters);
  515. launch_bio(limiter->pool->completion.vdo, data_vio, bio);
  516. limiter->wake_count++;
  517. bio = bio_list_peek(limiter->permitted_waiters);
  518. limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio));
  519. }
  520. static void assign_discard_permit(struct limiter *limiter)
  521. {
  522. struct bio *bio = bio_list_pop(&limiter->waiters);
  523. if (limiter->arrival == U64_MAX)
  524. limiter->arrival = get_arrival_time(bio);
  525. bio_list_add(limiter->permitted_waiters, bio);
  526. }
  527. static void get_waiters(struct limiter *limiter)
  528. {
  529. bio_list_merge_init(&limiter->waiters, &limiter->new_waiters);
  530. }
  531. static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool)
  532. {
  533. struct data_vio *data_vio =
  534. list_first_entry(&pool->available, struct data_vio, pool_entry);
  535. list_del_init(&data_vio->pool_entry);
  536. return data_vio;
  537. }
  538. static void assign_data_vio_to_waiter(struct limiter *limiter)
  539. {
  540. assign_data_vio(limiter, get_available_data_vio(limiter->pool));
  541. }
  542. static void update_limiter(struct limiter *limiter)
  543. {
  544. struct bio_list *waiters = &limiter->waiters;
  545. data_vio_count_t available = limiter->limit - limiter->busy;
  546. VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy),
  547. "Release count %u is not more than busy count %u",
  548. limiter->release_count, limiter->busy);
  549. get_waiters(limiter);
  550. for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--)
  551. limiter->assigner(limiter);
  552. if (limiter->release_count > 0) {
  553. WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count);
  554. limiter->release_count = 0;
  555. return;
  556. }
  557. for (; (available > 0) && !bio_list_empty(waiters); available--)
  558. limiter->assigner(limiter);
  559. WRITE_ONCE(limiter->busy, limiter->limit - available);
  560. if (limiter->max_busy < limiter->busy)
  561. WRITE_ONCE(limiter->max_busy, limiter->busy);
  562. }
  563. /**
  564. * schedule_releases() - Ensure that release processing is scheduled.
  565. * @pool: The data_vio pool.
  566. *
  567. * If this call switches the state to processing, enqueue. Otherwise, some other thread has already
  568. * done so.
  569. */
  570. static void schedule_releases(struct data_vio_pool *pool)
  571. {
  572. /* Pairs with the barrier in process_release_callback(). */
  573. smp_mb__before_atomic();
  574. if (atomic_cmpxchg(&pool->processing, false, true))
  575. return;
  576. pool->completion.requeue = true;
  577. vdo_launch_completion_with_priority(&pool->completion,
  578. CPU_Q_COMPLETE_VIO_PRIORITY);
  579. }
  580. static void reuse_or_release_resources(struct data_vio_pool *pool,
  581. struct data_vio *data_vio,
  582. struct list_head *returned)
  583. {
  584. if (data_vio->remaining_discard > 0) {
  585. if (bio_list_empty(&pool->discard_limiter.waiters)) {
  586. /* Return the data_vio's discard permit. */
  587. pool->discard_limiter.release_count++;
  588. } else {
  589. assign_discard_permit(&pool->discard_limiter);
  590. }
  591. }
  592. if (pool->limiter.arrival < pool->discard_limiter.arrival) {
  593. assign_data_vio(&pool->limiter, data_vio);
  594. } else if (pool->discard_limiter.arrival < U64_MAX) {
  595. assign_data_vio(&pool->discard_limiter, data_vio);
  596. } else {
  597. list_add(&data_vio->pool_entry, returned);
  598. pool->limiter.release_count++;
  599. }
  600. }
  601. /**
  602. * process_release_callback() - Process a batch of data_vio releases.
  603. * @completion: The pool with data_vios to release.
  604. */
  605. static void process_release_callback(struct vdo_completion *completion)
  606. {
  607. struct data_vio_pool *pool = as_data_vio_pool(completion);
  608. bool reschedule;
  609. bool drained;
  610. data_vio_count_t processed;
  611. data_vio_count_t to_wake;
  612. data_vio_count_t discards_to_wake;
  613. LIST_HEAD(returned);
  614. spin_lock(&pool->lock);
  615. get_waiters(&pool->discard_limiter);
  616. get_waiters(&pool->limiter);
  617. spin_unlock(&pool->lock);
  618. if (pool->limiter.arrival == U64_MAX) {
  619. struct bio *bio = bio_list_peek(&pool->limiter.waiters);
  620. if (bio != NULL)
  621. pool->limiter.arrival = get_arrival_time(bio);
  622. }
  623. for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) {
  624. struct data_vio *data_vio;
  625. struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue);
  626. if (entry == NULL)
  627. break;
  628. data_vio = as_data_vio(container_of(entry, struct vdo_completion,
  629. work_queue_entry_link));
  630. acknowledge_data_vio(data_vio);
  631. reuse_or_release_resources(pool, data_vio, &returned);
  632. }
  633. spin_lock(&pool->lock);
  634. /*
  635. * There is a race where waiters could be added while we are in the unlocked section above.
  636. * Those waiters could not see the resources we are now about to release, so we assign
  637. * those resources now as we have no guarantee of being rescheduled. This is handled in
  638. * update_limiter().
  639. */
  640. update_limiter(&pool->discard_limiter);
  641. list_splice(&returned, &pool->available);
  642. update_limiter(&pool->limiter);
  643. to_wake = pool->limiter.wake_count;
  644. pool->limiter.wake_count = 0;
  645. discards_to_wake = pool->discard_limiter.wake_count;
  646. pool->discard_limiter.wake_count = 0;
  647. atomic_set(&pool->processing, false);
  648. /* Pairs with the barrier in schedule_releases(). */
  649. smp_mb();
  650. reschedule = !vdo_is_funnel_queue_empty(pool->queue);
  651. drained = (!reschedule &&
  652. vdo_is_state_draining(&pool->state) &&
  653. check_for_drain_complete_locked(pool));
  654. spin_unlock(&pool->lock);
  655. if (to_wake > 0)
  656. wake_up_nr(&pool->limiter.blocked_threads, to_wake);
  657. if (discards_to_wake > 0)
  658. wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake);
  659. if (reschedule)
  660. schedule_releases(pool);
  661. else if (drained)
  662. vdo_finish_draining(&pool->state);
  663. }
  664. static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool,
  665. assigner_fn assigner, data_vio_count_t limit)
  666. {
  667. limiter->pool = pool;
  668. limiter->assigner = assigner;
  669. limiter->limit = limit;
  670. limiter->arrival = U64_MAX;
  671. init_waitqueue_head(&limiter->blocked_threads);
  672. }
  673. /**
  674. * initialize_data_vio() - Allocate the components of a data_vio.
  675. * @data_vio: The data_vio to initialize.
  676. * @vdo: The vdo containing the data_vio.
  677. *
  678. * The caller is responsible for cleaning up the data_vio on error.
  679. *
  680. * Return: VDO_SUCCESS or an error.
  681. */
  682. static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo)
  683. {
  684. struct bio *bio;
  685. int result;
  686. BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE);
  687. result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data",
  688. &data_vio->vio.data);
  689. if (result != VDO_SUCCESS)
  690. return vdo_log_error_strerror(result,
  691. "data_vio data allocation failure");
  692. result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block",
  693. &data_vio->compression.block);
  694. if (result != VDO_SUCCESS) {
  695. return vdo_log_error_strerror(result,
  696. "data_vio compressed block allocation failure");
  697. }
  698. result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch",
  699. &data_vio->scratch_block);
  700. if (result != VDO_SUCCESS)
  701. return vdo_log_error_strerror(result,
  702. "data_vio scratch allocation failure");
  703. result = vdo_create_bio(&bio);
  704. if (result != VDO_SUCCESS)
  705. return vdo_log_error_strerror(result,
  706. "data_vio data bio allocation failure");
  707. vdo_initialize_completion(&data_vio->decrement_completion, vdo,
  708. VDO_DECREMENT_COMPLETION);
  709. initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo);
  710. return VDO_SUCCESS;
  711. }
  712. static void destroy_data_vio(struct data_vio *data_vio)
  713. {
  714. if (data_vio == NULL)
  715. return;
  716. vdo_free_bio(vdo_forget(data_vio->vio.bio));
  717. vdo_free(vdo_forget(data_vio->vio.data));
  718. vdo_free(vdo_forget(data_vio->compression.block));
  719. vdo_free(vdo_forget(data_vio->scratch_block));
  720. }
  721. /**
  722. * make_data_vio_pool() - Initialize a data_vio pool.
  723. * @vdo: The vdo to which the pool will belong.
  724. * @pool_size: The number of data_vios in the pool.
  725. * @discard_limit: The maximum number of data_vios which may be used for discards.
  726. * @pool_ptr: A pointer to hold the newly allocated pool.
  727. */
  728. int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size,
  729. data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr)
  730. {
  731. int result;
  732. struct data_vio_pool *pool;
  733. data_vio_count_t i;
  734. result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio,
  735. __func__, &pool);
  736. if (result != VDO_SUCCESS)
  737. return result;
  738. VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size),
  739. "discard limit does not exceed pool size");
  740. initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit,
  741. discard_limit);
  742. pool->discard_limiter.permitted_waiters = &pool->permitted_discards;
  743. initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size);
  744. pool->limiter.permitted_waiters = &pool->limiter.waiters;
  745. INIT_LIST_HEAD(&pool->available);
  746. spin_lock_init(&pool->lock);
  747. vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
  748. vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION);
  749. vdo_prepare_completion(&pool->completion, process_release_callback,
  750. process_release_callback, vdo->thread_config.cpu_thread,
  751. NULL);
  752. result = vdo_make_funnel_queue(&pool->queue);
  753. if (result != VDO_SUCCESS) {
  754. free_data_vio_pool(vdo_forget(pool));
  755. return result;
  756. }
  757. for (i = 0; i < pool_size; i++) {
  758. struct data_vio *data_vio = &pool->data_vios[i];
  759. result = initialize_data_vio(data_vio, vdo);
  760. if (result != VDO_SUCCESS) {
  761. destroy_data_vio(data_vio);
  762. free_data_vio_pool(pool);
  763. return result;
  764. }
  765. list_add(&data_vio->pool_entry, &pool->available);
  766. }
  767. *pool_ptr = pool;
  768. return VDO_SUCCESS;
  769. }
  770. /**
  771. * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it.
  772. * @pool: The data_vio pool to free.
  773. *
  774. * All data_vios must be returned to the pool before calling this function.
  775. */
  776. void free_data_vio_pool(struct data_vio_pool *pool)
  777. {
  778. struct data_vio *data_vio, *tmp;
  779. if (pool == NULL)
  780. return;
  781. /*
  782. * Pairs with the barrier in process_release_callback(). Possibly not needed since it
  783. * caters to an enqueue vs. free race.
  784. */
  785. smp_mb();
  786. BUG_ON(atomic_read(&pool->processing));
  787. spin_lock(&pool->lock);
  788. VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0),
  789. "data_vio pool must not have %u busy entries when being freed",
  790. pool->limiter.busy);
  791. VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) &&
  792. bio_list_empty(&pool->limiter.new_waiters)),
  793. "data_vio pool must not have threads waiting to read or write when being freed");
  794. VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) &&
  795. bio_list_empty(&pool->discard_limiter.new_waiters)),
  796. "data_vio pool must not have threads waiting to discard when being freed");
  797. spin_unlock(&pool->lock);
  798. list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) {
  799. list_del_init(&data_vio->pool_entry);
  800. destroy_data_vio(data_vio);
  801. }
  802. vdo_free_funnel_queue(vdo_forget(pool->queue));
  803. vdo_free(pool);
  804. }
  805. static bool acquire_permit(struct limiter *limiter)
  806. {
  807. if (limiter->busy >= limiter->limit)
  808. return false;
  809. WRITE_ONCE(limiter->busy, limiter->busy + 1);
  810. if (limiter->max_busy < limiter->busy)
  811. WRITE_ONCE(limiter->max_busy, limiter->busy);
  812. return true;
  813. }
  814. static void wait_permit(struct limiter *limiter, struct bio *bio)
  815. __releases(&limiter->pool->lock)
  816. {
  817. DEFINE_WAIT(wait);
  818. bio_list_add(&limiter->new_waiters, bio);
  819. prepare_to_wait_exclusive(&limiter->blocked_threads, &wait,
  820. TASK_UNINTERRUPTIBLE);
  821. spin_unlock(&limiter->pool->lock);
  822. io_schedule();
  823. finish_wait(&limiter->blocked_threads, &wait);
  824. }
  825. /**
  826. * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it.
  827. * @pool: The data_vio pool.
  828. * @bio: The bio to launch.
  829. *
  830. * This will block if data_vios or discard permits are not available.
  831. */
  832. void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio)
  833. {
  834. struct data_vio *data_vio;
  835. VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state),
  836. "data_vio_pool not quiescent on acquire");
  837. bio->bi_private = (void *) jiffies;
  838. spin_lock(&pool->lock);
  839. if ((bio_op(bio) == REQ_OP_DISCARD) &&
  840. !acquire_permit(&pool->discard_limiter)) {
  841. wait_permit(&pool->discard_limiter, bio);
  842. return;
  843. }
  844. if (!acquire_permit(&pool->limiter)) {
  845. wait_permit(&pool->limiter, bio);
  846. return;
  847. }
  848. data_vio = get_available_data_vio(pool);
  849. spin_unlock(&pool->lock);
  850. launch_bio(pool->completion.vdo, data_vio, bio);
  851. }
  852. /* Implements vdo_admin_initiator_fn. */
  853. static void initiate_drain(struct admin_state *state)
  854. {
  855. bool drained;
  856. struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state);
  857. spin_lock(&pool->lock);
  858. drained = check_for_drain_complete_locked(pool);
  859. spin_unlock(&pool->lock);
  860. if (drained)
  861. vdo_finish_draining(state);
  862. }
  863. static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name)
  864. {
  865. VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread),
  866. "%s called on cpu thread", name);
  867. }
  868. /**
  869. * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool.
  870. * @pool: The data_vio pool.
  871. * @completion: The completion to notify when the pool has drained.
  872. */
  873. void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
  874. {
  875. assert_on_vdo_cpu_thread(completion->vdo, __func__);
  876. vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion,
  877. initiate_drain);
  878. }
  879. /**
  880. * resume_data_vio_pool() - Resume a data_vio pool.
  881. * @pool: The data_vio pool.
  882. * @completion: The completion to notify when the pool has resumed.
  883. */
  884. void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
  885. {
  886. assert_on_vdo_cpu_thread(completion->vdo, __func__);
  887. vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state));
  888. }
  889. static void dump_limiter(const char *name, struct limiter *limiter)
  890. {
  891. vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy,
  892. limiter->limit, limiter->max_busy,
  893. ((bio_list_empty(&limiter->waiters) &&
  894. bio_list_empty(&limiter->new_waiters)) ?
  895. "no waiters" : "has waiters"));
  896. }
  897. /**
  898. * dump_data_vio_pool() - Dump a data_vio pool to the log.
  899. * @pool: The data_vio pool.
  900. * @dump_vios: Whether to dump the details of each busy data_vio as well.
  901. */
  902. void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios)
  903. {
  904. /*
  905. * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the
  906. * second clock tick). These numbers were picked based on experiments with lab machines.
  907. */
  908. static const int ELEMENTS_PER_BATCH = 35;
  909. static const int SLEEP_FOR_SYSLOG = 4000;
  910. if (pool == NULL)
  911. return;
  912. spin_lock(&pool->lock);
  913. dump_limiter("data_vios", &pool->limiter);
  914. dump_limiter("discard permits", &pool->discard_limiter);
  915. if (dump_vios) {
  916. int i;
  917. int dumped = 0;
  918. for (i = 0; i < pool->limiter.limit; i++) {
  919. struct data_vio *data_vio = &pool->data_vios[i];
  920. if (!list_empty(&data_vio->pool_entry))
  921. continue;
  922. dump_data_vio(data_vio);
  923. if (++dumped >= ELEMENTS_PER_BATCH) {
  924. spin_unlock(&pool->lock);
  925. dumped = 0;
  926. fsleep(SLEEP_FOR_SYSLOG);
  927. spin_lock(&pool->lock);
  928. }
  929. }
  930. }
  931. spin_unlock(&pool->lock);
  932. }
  933. data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool)
  934. {
  935. return READ_ONCE(pool->limiter.busy);
  936. }
  937. data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool)
  938. {
  939. return READ_ONCE(pool->limiter.limit);
  940. }
  941. data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool)
  942. {
  943. return READ_ONCE(pool->limiter.max_busy);
  944. }
  945. static void update_data_vio_error_stats(struct data_vio *data_vio)
  946. {
  947. u8 index = 0;
  948. static const char * const operations[] = {
  949. [0] = "empty",
  950. [1] = "read",
  951. [2] = "write",
  952. [3] = "read-modify-write",
  953. [5] = "read+fua",
  954. [6] = "write+fua",
  955. [7] = "read-modify-write+fua",
  956. };
  957. if (data_vio->read)
  958. index = 1;
  959. if (data_vio->write)
  960. index += 2;
  961. if (data_vio->fua)
  962. index += 4;
  963. update_vio_error_stats(&data_vio->vio,
  964. "Completing %s vio for LBN %llu with error after %s",
  965. operations[index],
  966. (unsigned long long) data_vio->logical.lbn,
  967. get_data_vio_operation_name(data_vio));
  968. }
  969. static void perform_cleanup_stage(struct data_vio *data_vio,
  970. enum data_vio_cleanup_stage stage);
  971. /**
  972. * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at
  973. * the end of processing a data_vio.
  974. * @completion: The data_vio holding the lock.
  975. */
  976. static void release_allocated_lock(struct vdo_completion *completion)
  977. {
  978. struct data_vio *data_vio = as_data_vio(completion);
  979. assert_data_vio_in_allocated_zone(data_vio);
  980. release_data_vio_allocation_lock(data_vio, false);
  981. perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS);
  982. }
  983. /** release_lock() - Release an uncontended LBN lock. */
  984. static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock)
  985. {
  986. struct int_map *lock_map = lock->zone->lbn_operations;
  987. struct data_vio *lock_holder;
  988. if (!lock->locked) {
  989. /* The lock is not locked, so it had better not be registered in the lock map. */
  990. struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn);
  991. VDO_ASSERT_LOG_ONLY((data_vio != lock_holder),
  992. "no logical block lock held for block %llu",
  993. (unsigned long long) lock->lbn);
  994. return;
  995. }
  996. /* Release the lock by removing the lock from the map. */
  997. lock_holder = vdo_int_map_remove(lock_map, lock->lbn);
  998. VDO_ASSERT_LOG_ONLY((data_vio == lock_holder),
  999. "logical block lock mismatch for block %llu",
  1000. (unsigned long long) lock->lbn);
  1001. lock->locked = false;
  1002. }
  1003. /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */
  1004. static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock)
  1005. {
  1006. struct data_vio *lock_holder, *next_lock_holder;
  1007. int result;
  1008. VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked");
  1009. /* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */
  1010. next_lock_holder =
  1011. vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
  1012. /* Transfer the remaining lock waiters to the next lock holder. */
  1013. vdo_waitq_transfer_all_waiters(&lock->waiters,
  1014. &next_lock_holder->logical.waiters);
  1015. result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
  1016. next_lock_holder, true, (void **) &lock_holder);
  1017. if (result != VDO_SUCCESS) {
  1018. continue_data_vio_with_error(next_lock_holder, result);
  1019. return;
  1020. }
  1021. VDO_ASSERT_LOG_ONLY((lock_holder == data_vio),
  1022. "logical block lock mismatch for block %llu",
  1023. (unsigned long long) lock->lbn);
  1024. lock->locked = false;
  1025. /*
  1026. * If there are still waiters, other data_vios must be trying to get the lock we just
  1027. * transferred. We must ensure that the new lock holder doesn't block in the packer.
  1028. */
  1029. if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters))
  1030. cancel_data_vio_compression(next_lock_holder);
  1031. /*
  1032. * Avoid stack overflow on lock transfer.
  1033. * FIXME: this is only an issue in the 1 thread config.
  1034. */
  1035. next_lock_holder->vio.completion.requeue = true;
  1036. launch_locked_request(next_lock_holder);
  1037. }
  1038. /**
  1039. * release_logical_lock() - Release the logical block lock and flush generation lock at the end of
  1040. * processing a data_vio.
  1041. * @completion: The data_vio holding the lock.
  1042. */
  1043. static void release_logical_lock(struct vdo_completion *completion)
  1044. {
  1045. struct data_vio *data_vio = as_data_vio(completion);
  1046. struct lbn_lock *lock = &data_vio->logical;
  1047. assert_data_vio_in_logical_zone(data_vio);
  1048. if (vdo_waitq_has_waiters(&lock->waiters))
  1049. transfer_lock(data_vio, lock);
  1050. else
  1051. release_lock(data_vio, lock);
  1052. vdo_release_flush_generation_lock(data_vio);
  1053. perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE);
  1054. }
  1055. /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */
  1056. static void clean_hash_lock(struct vdo_completion *completion)
  1057. {
  1058. struct data_vio *data_vio = as_data_vio(completion);
  1059. assert_data_vio_in_hash_zone(data_vio);
  1060. if (completion->result != VDO_SUCCESS) {
  1061. vdo_clean_failed_hash_lock(data_vio);
  1062. return;
  1063. }
  1064. vdo_release_hash_lock(data_vio);
  1065. perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL);
  1066. }
  1067. /**
  1068. * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up.
  1069. * @data_vio: The data_vio.
  1070. *
  1071. * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the
  1072. * pool.
  1073. */
  1074. static void finish_cleanup(struct data_vio *data_vio)
  1075. {
  1076. struct vdo_completion *completion = &data_vio->vio.completion;
  1077. u32 discard_size = min_t(u32, data_vio->remaining_discard,
  1078. VDO_BLOCK_SIZE - data_vio->offset);
  1079. VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL,
  1080. "complete data_vio has no allocation lock");
  1081. VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL,
  1082. "complete data_vio has no hash lock");
  1083. if ((data_vio->remaining_discard <= discard_size) ||
  1084. (completion->result != VDO_SUCCESS)) {
  1085. struct data_vio_pool *pool = completion->vdo->data_vio_pool;
  1086. vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link);
  1087. schedule_releases(pool);
  1088. return;
  1089. }
  1090. data_vio->remaining_discard -= discard_size;
  1091. data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE);
  1092. data_vio->read = data_vio->is_partial;
  1093. data_vio->offset = 0;
  1094. completion->requeue = true;
  1095. data_vio->first_reference_operation_complete = false;
  1096. launch_data_vio(data_vio, data_vio->logical.lbn + 1);
  1097. }
  1098. /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */
  1099. static void perform_cleanup_stage(struct data_vio *data_vio,
  1100. enum data_vio_cleanup_stage stage)
  1101. {
  1102. struct vdo *vdo = vdo_from_data_vio(data_vio);
  1103. switch (stage) {
  1104. case VIO_RELEASE_HASH_LOCK:
  1105. if (data_vio->hash_lock != NULL) {
  1106. launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock);
  1107. return;
  1108. }
  1109. fallthrough;
  1110. case VIO_RELEASE_ALLOCATED:
  1111. if (data_vio_has_allocation(data_vio)) {
  1112. launch_data_vio_allocated_zone_callback(data_vio,
  1113. release_allocated_lock);
  1114. return;
  1115. }
  1116. fallthrough;
  1117. case VIO_RELEASE_RECOVERY_LOCKS:
  1118. if ((data_vio->recovery_sequence_number > 0) &&
  1119. (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) &&
  1120. (data_vio->vio.completion.result != VDO_READ_ONLY))
  1121. vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock");
  1122. fallthrough;
  1123. case VIO_RELEASE_LOGICAL:
  1124. launch_data_vio_logical_callback(data_vio, release_logical_lock);
  1125. return;
  1126. default:
  1127. finish_cleanup(data_vio);
  1128. }
  1129. }
  1130. void complete_data_vio(struct vdo_completion *completion)
  1131. {
  1132. struct data_vio *data_vio = as_data_vio(completion);
  1133. completion->error_handler = NULL;
  1134. data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP;
  1135. perform_cleanup_stage(data_vio,
  1136. (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL));
  1137. }
  1138. static void enter_read_only_mode(struct vdo_completion *completion)
  1139. {
  1140. if (vdo_is_read_only(completion->vdo))
  1141. return;
  1142. if (completion->result != VDO_READ_ONLY) {
  1143. struct data_vio *data_vio = as_data_vio(completion);
  1144. vdo_log_error_strerror(completion->result,
  1145. "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s",
  1146. (unsigned long long) data_vio->logical.lbn,
  1147. (unsigned long long) data_vio->new_mapped.pbn,
  1148. (unsigned long long) data_vio->mapped.pbn,
  1149. (unsigned long long) data_vio->allocation.pbn,
  1150. get_data_vio_operation_name(data_vio));
  1151. }
  1152. vdo_enter_read_only_mode(completion->vdo, completion->result);
  1153. }
  1154. void handle_data_vio_error(struct vdo_completion *completion)
  1155. {
  1156. struct data_vio *data_vio = as_data_vio(completion);
  1157. if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL))
  1158. enter_read_only_mode(completion);
  1159. update_data_vio_error_stats(data_vio);
  1160. complete_data_vio(completion);
  1161. }
  1162. /**
  1163. * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a
  1164. * data_vio.
  1165. * @data_vio: The data_vio.
  1166. */
  1167. const char *get_data_vio_operation_name(struct data_vio *data_vio)
  1168. {
  1169. BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) !=
  1170. ARRAY_SIZE(ASYNC_OPERATION_NAMES));
  1171. return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ?
  1172. ASYNC_OPERATION_NAMES[data_vio->last_async_operation] :
  1173. "unknown async operation");
  1174. }
  1175. /**
  1176. * data_vio_allocate_data_block() - Allocate a data block.
  1177. * @data_vio: The data_vio.
  1178. * @write_lock_type: The type of write lock to obtain on the block.
  1179. * @callback: The callback which will attempt an allocation in the current zone and continue if it
  1180. * succeeds.
  1181. * @error_handler: The handler for errors while allocating.
  1182. */
  1183. void data_vio_allocate_data_block(struct data_vio *data_vio,
  1184. enum pbn_lock_type write_lock_type,
  1185. vdo_action_fn callback, vdo_action_fn error_handler)
  1186. {
  1187. struct allocation *allocation = &data_vio->allocation;
  1188. VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK),
  1189. "data_vio does not have an allocation");
  1190. allocation->write_lock_type = write_lock_type;
  1191. allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone);
  1192. allocation->first_allocation_zone = allocation->zone->zone_number;
  1193. data_vio->vio.completion.error_handler = error_handler;
  1194. launch_data_vio_allocated_zone_callback(data_vio, callback);
  1195. }
  1196. /**
  1197. * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block.
  1198. * @data_vio: The data_vio.
  1199. * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten).
  1200. *
  1201. * If the reference to the locked block is still provisional, it will be released as well.
  1202. */
  1203. void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset)
  1204. {
  1205. struct allocation *allocation = &data_vio->allocation;
  1206. physical_block_number_t locked_pbn = allocation->pbn;
  1207. assert_data_vio_in_allocated_zone(data_vio);
  1208. if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock))
  1209. allocation->pbn = VDO_ZERO_BLOCK;
  1210. vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn,
  1211. vdo_forget(allocation->lock));
  1212. }
  1213. /**
  1214. * uncompress_data_vio() - Uncompress the data a data_vio has just read.
  1215. * @data_vio: The data_vio.
  1216. * @mapping_state: The mapping state indicating which fragment to decompress.
  1217. * @buffer: The buffer to receive the uncompressed data.
  1218. */
  1219. int uncompress_data_vio(struct data_vio *data_vio,
  1220. enum block_mapping_state mapping_state, char *buffer)
  1221. {
  1222. int size;
  1223. u16 fragment_offset, fragment_size;
  1224. struct compressed_block *block = data_vio->compression.block;
  1225. int result = vdo_get_compressed_block_fragment(mapping_state, block,
  1226. &fragment_offset, &fragment_size);
  1227. if (result != VDO_SUCCESS) {
  1228. vdo_log_debug("%s: compressed fragment error %d", __func__, result);
  1229. return result;
  1230. }
  1231. size = LZ4_decompress_safe((block->data + fragment_offset), buffer,
  1232. fragment_size, VDO_BLOCK_SIZE);
  1233. if (size != VDO_BLOCK_SIZE) {
  1234. vdo_log_debug("%s: lz4 error", __func__);
  1235. return VDO_INVALID_FRAGMENT;
  1236. }
  1237. return VDO_SUCCESS;
  1238. }
  1239. /**
  1240. * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle.
  1241. * @completion: The data_vio which has just finished its read.
  1242. *
  1243. * This callback is registered in read_block().
  1244. */
  1245. static void modify_for_partial_write(struct vdo_completion *completion)
  1246. {
  1247. struct data_vio *data_vio = as_data_vio(completion);
  1248. char *data = data_vio->vio.data;
  1249. struct bio *bio = data_vio->user_bio;
  1250. assert_data_vio_on_cpu_thread(data_vio);
  1251. if (bio_op(bio) == REQ_OP_DISCARD) {
  1252. memset(data + data_vio->offset, '\0', min_t(u32,
  1253. data_vio->remaining_discard,
  1254. VDO_BLOCK_SIZE - data_vio->offset));
  1255. } else {
  1256. copy_from_bio(bio, data + data_vio->offset);
  1257. }
  1258. data_vio->is_zero = mem_is_zero(data, VDO_BLOCK_SIZE);
  1259. data_vio->read = false;
  1260. launch_data_vio_logical_callback(data_vio,
  1261. continue_data_vio_with_block_map_slot);
  1262. }
  1263. static void complete_read(struct vdo_completion *completion)
  1264. {
  1265. struct data_vio *data_vio = as_data_vio(completion);
  1266. char *data = data_vio->vio.data;
  1267. bool compressed = vdo_is_state_compressed(data_vio->mapped.state);
  1268. assert_data_vio_on_cpu_thread(data_vio);
  1269. if (compressed) {
  1270. int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data);
  1271. if (result != VDO_SUCCESS) {
  1272. continue_data_vio_with_error(data_vio, result);
  1273. return;
  1274. }
  1275. }
  1276. if (data_vio->write) {
  1277. modify_for_partial_write(completion);
  1278. return;
  1279. }
  1280. if (compressed || data_vio->is_partial)
  1281. copy_to_bio(data_vio->user_bio, data + data_vio->offset);
  1282. acknowledge_data_vio(data_vio);
  1283. complete_data_vio(completion);
  1284. }
  1285. static void read_endio(struct bio *bio)
  1286. {
  1287. struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
  1288. int result = blk_status_to_errno(bio->bi_status);
  1289. vdo_count_completed_bios(bio);
  1290. if (result != VDO_SUCCESS) {
  1291. continue_data_vio_with_error(data_vio, result);
  1292. return;
  1293. }
  1294. launch_data_vio_cpu_callback(data_vio, complete_read,
  1295. CPU_Q_COMPLETE_READ_PRIORITY);
  1296. }
  1297. static void complete_zero_read(struct vdo_completion *completion)
  1298. {
  1299. struct data_vio *data_vio = as_data_vio(completion);
  1300. assert_data_vio_on_cpu_thread(data_vio);
  1301. if (data_vio->is_partial) {
  1302. memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE);
  1303. if (data_vio->write) {
  1304. modify_for_partial_write(completion);
  1305. return;
  1306. }
  1307. } else {
  1308. zero_fill_bio(data_vio->user_bio);
  1309. }
  1310. complete_read(completion);
  1311. }
  1312. /**
  1313. * read_block() - Read a block asynchronously.
  1314. * @completion: The data_vio doing the read.
  1315. *
  1316. * This is the callback registered in read_block_mapping().
  1317. */
  1318. static void read_block(struct vdo_completion *completion)
  1319. {
  1320. struct data_vio *data_vio = as_data_vio(completion);
  1321. struct vio *vio = as_vio(completion);
  1322. int result = VDO_SUCCESS;
  1323. if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
  1324. launch_data_vio_cpu_callback(data_vio, complete_zero_read,
  1325. CPU_Q_COMPLETE_VIO_PRIORITY);
  1326. return;
  1327. }
  1328. data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO;
  1329. if (vdo_is_state_compressed(data_vio->mapped.state)) {
  1330. result = vio_reset_bio(vio, (char *) data_vio->compression.block,
  1331. read_endio, REQ_OP_READ, data_vio->mapped.pbn);
  1332. } else {
  1333. blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ);
  1334. if (data_vio->is_partial) {
  1335. result = vio_reset_bio(vio, vio->data, read_endio, opf,
  1336. data_vio->mapped.pbn);
  1337. } else {
  1338. /* A full 4k read. Use the incoming bio to avoid having to copy the data */
  1339. bio_reset(vio->bio, vio->bio->bi_bdev, opf);
  1340. bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio,
  1341. data_vio->user_bio, GFP_KERNEL);
  1342. /* Copy over the original bio iovec and opflags. */
  1343. vdo_set_bio_properties(vio->bio, vio, read_endio, opf,
  1344. data_vio->mapped.pbn);
  1345. }
  1346. }
  1347. if (result != VDO_SUCCESS) {
  1348. continue_data_vio_with_error(data_vio, result);
  1349. return;
  1350. }
  1351. vdo_submit_data_vio(data_vio);
  1352. }
  1353. static inline struct data_vio *
  1354. reference_count_update_completion_as_data_vio(struct vdo_completion *completion)
  1355. {
  1356. if (completion->type == VIO_COMPLETION)
  1357. return as_data_vio(completion);
  1358. return container_of(completion, struct data_vio, decrement_completion);
  1359. }
  1360. /**
  1361. * update_block_map() - Rendezvous of the data_vio and decrement completions after each has
  1362. * made its reference updates. Handle any error from either, or proceed
  1363. * to updating the block map.
  1364. * @completion: The completion of the write in progress.
  1365. */
  1366. static void update_block_map(struct vdo_completion *completion)
  1367. {
  1368. struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion);
  1369. assert_data_vio_in_logical_zone(data_vio);
  1370. if (!data_vio->first_reference_operation_complete) {
  1371. /* Rendezvous, we're first */
  1372. data_vio->first_reference_operation_complete = true;
  1373. return;
  1374. }
  1375. completion = &data_vio->vio.completion;
  1376. vdo_set_completion_result(completion, data_vio->decrement_completion.result);
  1377. if (completion->result != VDO_SUCCESS) {
  1378. handle_data_vio_error(completion);
  1379. return;
  1380. }
  1381. completion->error_handler = handle_data_vio_error;
  1382. if (data_vio->hash_lock != NULL)
  1383. set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock);
  1384. else
  1385. completion->callback = complete_data_vio;
  1386. data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK;
  1387. vdo_put_mapped_block(data_vio);
  1388. }
  1389. static void decrement_reference_count(struct vdo_completion *completion)
  1390. {
  1391. struct data_vio *data_vio = container_of(completion, struct data_vio,
  1392. decrement_completion);
  1393. assert_data_vio_in_mapped_zone(data_vio);
  1394. vdo_set_completion_callback(completion, update_block_map,
  1395. data_vio->logical.zone->thread_id);
  1396. completion->error_handler = update_block_map;
  1397. vdo_modify_reference_count(completion, &data_vio->decrement_updater);
  1398. }
  1399. static void increment_reference_count(struct vdo_completion *completion)
  1400. {
  1401. struct data_vio *data_vio = as_data_vio(completion);
  1402. assert_data_vio_in_new_mapped_zone(data_vio);
  1403. if (data_vio->downgrade_allocation_lock) {
  1404. /*
  1405. * Now that the data has been written, it's safe to deduplicate against the
  1406. * block. Downgrade the allocation lock to a read lock so it can be used later by
  1407. * the hash lock. This is done here since it needs to happen sometime before we
  1408. * return to the hash zone, and we are currently on the correct thread. For
  1409. * compressed blocks, the downgrade will have already been done.
  1410. */
  1411. vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false);
  1412. }
  1413. set_data_vio_logical_callback(data_vio, update_block_map);
  1414. completion->error_handler = update_block_map;
  1415. vdo_modify_reference_count(completion, &data_vio->increment_updater);
  1416. }
  1417. /** journal_remapping() - Add a recovery journal entry for a data remapping. */
  1418. static void journal_remapping(struct vdo_completion *completion)
  1419. {
  1420. struct data_vio *data_vio = as_data_vio(completion);
  1421. assert_data_vio_in_journal_zone(data_vio);
  1422. data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING;
  1423. data_vio->decrement_updater.zpbn = data_vio->mapped;
  1424. if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
  1425. data_vio->first_reference_operation_complete = true;
  1426. if (data_vio->mapped.pbn == VDO_ZERO_BLOCK)
  1427. set_data_vio_logical_callback(data_vio, update_block_map);
  1428. } else {
  1429. set_data_vio_new_mapped_zone_callback(data_vio,
  1430. increment_reference_count);
  1431. }
  1432. if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
  1433. data_vio->first_reference_operation_complete = true;
  1434. } else {
  1435. vdo_set_completion_callback(&data_vio->decrement_completion,
  1436. decrement_reference_count,
  1437. data_vio->mapped.zone->thread_id);
  1438. }
  1439. data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING;
  1440. vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
  1441. }
  1442. /**
  1443. * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write.
  1444. * @completion: The data_vio doing the read.
  1445. *
  1446. * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate
  1447. * journal entry referencing the removal of this LBN->PBN mapping.
  1448. */
  1449. static void read_old_block_mapping(struct vdo_completion *completion)
  1450. {
  1451. struct data_vio *data_vio = as_data_vio(completion);
  1452. assert_data_vio_in_logical_zone(data_vio);
  1453. data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE;
  1454. set_data_vio_journal_callback(data_vio, journal_remapping);
  1455. vdo_get_mapped_block(data_vio);
  1456. }
  1457. void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock)
  1458. {
  1459. data_vio->increment_updater = (struct reference_updater) {
  1460. .operation = VDO_JOURNAL_DATA_REMAPPING,
  1461. .increment = true,
  1462. .zpbn = data_vio->new_mapped,
  1463. .lock = lock,
  1464. };
  1465. launch_data_vio_logical_callback(data_vio, read_old_block_mapping);
  1466. }
  1467. /**
  1468. * pack_compressed_data() - Attempt to pack the compressed data_vio into a block.
  1469. * @completion: The data_vio.
  1470. *
  1471. * This is the callback registered in launch_compress_data_vio().
  1472. */
  1473. static void pack_compressed_data(struct vdo_completion *completion)
  1474. {
  1475. struct data_vio *data_vio = as_data_vio(completion);
  1476. assert_data_vio_in_packer_zone(data_vio);
  1477. if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
  1478. get_data_vio_compression_status(data_vio).may_not_compress) {
  1479. write_data_vio(data_vio);
  1480. return;
  1481. }
  1482. data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING;
  1483. vdo_attempt_packing(data_vio);
  1484. }
  1485. /**
  1486. * compress_data_vio() - Do the actual work of compressing the data on a CPU queue.
  1487. * @completion: The data_vio.
  1488. *
  1489. * This callback is registered in launch_compress_data_vio().
  1490. */
  1491. static void compress_data_vio(struct vdo_completion *completion)
  1492. {
  1493. struct data_vio *data_vio = as_data_vio(completion);
  1494. int size;
  1495. assert_data_vio_on_cpu_thread(data_vio);
  1496. /*
  1497. * By putting the compressed data at the start of the compressed block data field, we won't
  1498. * need to copy it if this data_vio becomes a compressed write agent.
  1499. */
  1500. size = LZ4_compress_default(data_vio->vio.data,
  1501. data_vio->compression.block->data, VDO_BLOCK_SIZE,
  1502. VDO_MAX_COMPRESSED_FRAGMENT_SIZE,
  1503. (char *) vdo_get_work_queue_private_data());
  1504. if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) {
  1505. data_vio->compression.size = size;
  1506. launch_data_vio_packer_callback(data_vio, pack_compressed_data);
  1507. return;
  1508. }
  1509. write_data_vio(data_vio);
  1510. }
  1511. /**
  1512. * launch_compress_data_vio() - Continue a write by attempting to compress the data.
  1513. * @data_vio: The data_vio.
  1514. *
  1515. * This is a re-entry point to vio_write used by hash locks.
  1516. */
  1517. void launch_compress_data_vio(struct data_vio *data_vio)
  1518. {
  1519. VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block");
  1520. VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL,
  1521. "data_vio to compress has a hash_lock");
  1522. VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio),
  1523. "data_vio to compress has an allocation");
  1524. /*
  1525. * There are 4 reasons why a data_vio which has reached this point will not be eligible for
  1526. * compression:
  1527. *
  1528. * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the
  1529. * write request also requests FUA.
  1530. *
  1531. * 2) A data_vio should not be compressed when compression is disabled for the vdo.
  1532. *
  1533. * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not
  1534. * yet been acknowledged and hence blocking in the packer would be bad.
  1535. *
  1536. * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the
  1537. * packer would also be bad.
  1538. */
  1539. if (data_vio->fua ||
  1540. !vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
  1541. ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) ||
  1542. (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) {
  1543. write_data_vio(data_vio);
  1544. return;
  1545. }
  1546. data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO;
  1547. launch_data_vio_cpu_callback(data_vio, compress_data_vio,
  1548. CPU_Q_COMPRESS_BLOCK_PRIORITY);
  1549. }
  1550. /**
  1551. * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record
  1552. * name as set).
  1553. * @completion: The data_vio.
  1554. *
  1555. * This callback is registered in prepare_for_dedupe().
  1556. */
  1557. static void hash_data_vio(struct vdo_completion *completion)
  1558. {
  1559. struct data_vio *data_vio = as_data_vio(completion);
  1560. assert_data_vio_on_cpu_thread(data_vio);
  1561. VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed");
  1562. murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be,
  1563. &data_vio->record_name);
  1564. data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones,
  1565. &data_vio->record_name);
  1566. data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK;
  1567. launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock);
  1568. }
  1569. /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */
  1570. static void prepare_for_dedupe(struct data_vio *data_vio)
  1571. {
  1572. /* We don't care what thread we are on. */
  1573. VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks");
  1574. /*
  1575. * Before we can dedupe, we need to know the record name, so the first
  1576. * step is to hash the block data.
  1577. */
  1578. data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO;
  1579. launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY);
  1580. }
  1581. /**
  1582. * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called
  1583. * when a data_vio's write to the underlying storage has completed.
  1584. * @bio: The bio to update.
  1585. */
  1586. static void write_bio_finished(struct bio *bio)
  1587. {
  1588. struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private);
  1589. vdo_count_completed_bios(bio);
  1590. vdo_set_completion_result(&data_vio->vio.completion,
  1591. blk_status_to_errno(bio->bi_status));
  1592. data_vio->downgrade_allocation_lock = true;
  1593. update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock);
  1594. }
  1595. /** write_data_vio() - Write a data block to storage without compression. */
  1596. void write_data_vio(struct data_vio *data_vio)
  1597. {
  1598. struct data_vio_compression_status status, new_status;
  1599. int result;
  1600. if (!data_vio_has_allocation(data_vio)) {
  1601. /*
  1602. * There was no space to write this block and we failed to deduplicate or compress
  1603. * it.
  1604. */
  1605. continue_data_vio_with_error(data_vio, VDO_NO_SPACE);
  1606. return;
  1607. }
  1608. new_status = (struct data_vio_compression_status) {
  1609. .stage = DATA_VIO_POST_PACKER,
  1610. .may_not_compress = true,
  1611. };
  1612. do {
  1613. status = get_data_vio_compression_status(data_vio);
  1614. } while ((status.stage != DATA_VIO_POST_PACKER) &&
  1615. !set_data_vio_compression_status(data_vio, status, new_status));
  1616. /* Write the data from the data block buffer. */
  1617. result = vio_reset_bio(&data_vio->vio, data_vio->vio.data,
  1618. write_bio_finished, REQ_OP_WRITE,
  1619. data_vio->allocation.pbn);
  1620. if (result != VDO_SUCCESS) {
  1621. continue_data_vio_with_error(data_vio, result);
  1622. return;
  1623. }
  1624. data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO;
  1625. vdo_submit_data_vio(data_vio);
  1626. }
  1627. /**
  1628. * acknowledge_write_callback() - Acknowledge a write to the requestor.
  1629. * @completion: The data_vio.
  1630. *
  1631. * This callback is registered in allocate_block() and continue_write_with_block_map_slot().
  1632. */
  1633. static void acknowledge_write_callback(struct vdo_completion *completion)
  1634. {
  1635. struct data_vio *data_vio = as_data_vio(completion);
  1636. struct vdo *vdo = completion->vdo;
  1637. VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) ||
  1638. (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)),
  1639. "%s() called on bio ack queue", __func__);
  1640. VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio),
  1641. "write VIO to be acknowledged has a flush generation lock");
  1642. acknowledge_data_vio(data_vio);
  1643. if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
  1644. /* This is a zero write or discard */
  1645. update_metadata_for_data_vio_write(data_vio, NULL);
  1646. return;
  1647. }
  1648. prepare_for_dedupe(data_vio);
  1649. }
  1650. /**
  1651. * allocate_block() - Attempt to allocate a block in the current allocation zone.
  1652. * @completion: The data_vio.
  1653. *
  1654. * This callback is registered in continue_write_with_block_map_slot().
  1655. */
  1656. static void allocate_block(struct vdo_completion *completion)
  1657. {
  1658. struct data_vio *data_vio = as_data_vio(completion);
  1659. assert_data_vio_in_allocated_zone(data_vio);
  1660. if (!vdo_allocate_block_in_zone(data_vio))
  1661. return;
  1662. completion->error_handler = handle_data_vio_error;
  1663. WRITE_ONCE(data_vio->allocation_succeeded, true);
  1664. data_vio->new_mapped = (struct zoned_pbn) {
  1665. .zone = data_vio->allocation.zone,
  1666. .pbn = data_vio->allocation.pbn,
  1667. .state = VDO_MAPPING_STATE_UNCOMPRESSED,
  1668. };
  1669. if (data_vio->fua ||
  1670. data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) {
  1671. prepare_for_dedupe(data_vio);
  1672. return;
  1673. }
  1674. data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
  1675. launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
  1676. }
  1677. /**
  1678. * handle_allocation_error() - Handle an error attempting to allocate a block.
  1679. * @completion: The data_vio.
  1680. *
  1681. * This error handler is registered in continue_write_with_block_map_slot().
  1682. */
  1683. static void handle_allocation_error(struct vdo_completion *completion)
  1684. {
  1685. struct data_vio *data_vio = as_data_vio(completion);
  1686. if (completion->result == VDO_NO_SPACE) {
  1687. /* We failed to get an allocation, but we can try to dedupe. */
  1688. vdo_reset_completion(completion);
  1689. completion->error_handler = handle_data_vio_error;
  1690. prepare_for_dedupe(data_vio);
  1691. return;
  1692. }
  1693. /* We got a "real" error, not just a failure to allocate, so fail the request. */
  1694. handle_data_vio_error(completion);
  1695. }
  1696. static int assert_is_discard(struct data_vio *data_vio)
  1697. {
  1698. int result = VDO_ASSERT(data_vio->is_discard,
  1699. "data_vio with no block map page is a discard");
  1700. return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY);
  1701. }
  1702. /**
  1703. * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map.
  1704. * @completion: The data_vio to continue.
  1705. *
  1706. * This callback is registered in launch_read_data_vio().
  1707. */
  1708. void continue_data_vio_with_block_map_slot(struct vdo_completion *completion)
  1709. {
  1710. struct data_vio *data_vio = as_data_vio(completion);
  1711. assert_data_vio_in_logical_zone(data_vio);
  1712. if (data_vio->read) {
  1713. set_data_vio_logical_callback(data_vio, read_block);
  1714. data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ;
  1715. vdo_get_mapped_block(data_vio);
  1716. return;
  1717. }
  1718. vdo_acquire_flush_generation_lock(data_vio);
  1719. if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
  1720. /*
  1721. * This is a discard for a block on a block map page which has not been allocated, so
  1722. * there's nothing more we need to do.
  1723. */
  1724. completion->callback = complete_data_vio;
  1725. continue_data_vio_with_error(data_vio, assert_is_discard(data_vio));
  1726. return;
  1727. }
  1728. /*
  1729. * We need an allocation if this is neither a full-block discard nor a
  1730. * full-block zero write.
  1731. */
  1732. if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) {
  1733. data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block,
  1734. handle_allocation_error);
  1735. return;
  1736. }
  1737. /*
  1738. * We don't need to write any data, so skip allocation and just update the block map and
  1739. * reference counts (via the journal).
  1740. */
  1741. data_vio->new_mapped.pbn = VDO_ZERO_BLOCK;
  1742. if (data_vio->is_zero)
  1743. data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED;
  1744. if (data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) {
  1745. /* This is not the final block of a discard so we can't acknowledge it yet. */
  1746. update_metadata_for_data_vio_write(data_vio, NULL);
  1747. return;
  1748. }
  1749. data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
  1750. launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
  1751. }