tpm2_tests.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause)
  2. from argparse import ArgumentParser
  3. from argparse import FileType
  4. import os
  5. import sys
  6. import tpm2
  7. from tpm2 import ProtocolError
  8. import unittest
  9. import logging
  10. import struct
  11. class SmokeTest(unittest.TestCase):
  12. def setUp(self):
  13. self.client = tpm2.Client()
  14. self.root_key = self.client.create_root_key()
  15. def tearDown(self):
  16. self.client.flush_context(self.root_key)
  17. self.client.close()
  18. def test_seal_with_auth(self):
  19. data = ('X' * 64).encode()
  20. auth = ('A' * 15).encode()
  21. blob = self.client.seal(self.root_key, data, auth, None)
  22. result = self.client.unseal(self.root_key, blob, auth, None)
  23. self.assertEqual(data, result)
  24. def determine_bank_alg(self, mask):
  25. pcr_banks = self.client.get_cap_pcrs()
  26. for bank_alg, pcrSelection in pcr_banks.items():
  27. if pcrSelection & mask == mask:
  28. return bank_alg
  29. return None
  30. def test_seal_with_policy(self):
  31. bank_alg = self.determine_bank_alg(1 << 16)
  32. self.assertIsNotNone(bank_alg)
  33. handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
  34. data = ('X' * 64).encode()
  35. auth = ('A' * 15).encode()
  36. pcrs = [16]
  37. try:
  38. self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
  39. self.client.policy_password(handle)
  40. policy_dig = self.client.get_policy_digest(handle)
  41. finally:
  42. self.client.flush_context(handle)
  43. blob = self.client.seal(self.root_key, data, auth, policy_dig)
  44. handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
  45. try:
  46. self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
  47. self.client.policy_password(handle)
  48. result = self.client.unseal(self.root_key, blob, auth, handle)
  49. except:
  50. self.client.flush_context(handle)
  51. raise
  52. self.assertEqual(data, result)
  53. def test_unseal_with_wrong_auth(self):
  54. data = ('X' * 64).encode()
  55. auth = ('A' * 20).encode()
  56. rc = 0
  57. blob = self.client.seal(self.root_key, data, auth, None)
  58. try:
  59. result = self.client.unseal(self.root_key, blob,
  60. auth[:-1] + 'B'.encode(), None)
  61. except ProtocolError as e:
  62. rc = e.rc
  63. self.assertEqual(rc, tpm2.TPM2_RC_AUTH_FAIL)
  64. def test_unseal_with_wrong_policy(self):
  65. bank_alg = self.determine_bank_alg(1 << 16 | 1 << 1)
  66. self.assertIsNotNone(bank_alg)
  67. handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
  68. data = ('X' * 64).encode()
  69. auth = ('A' * 17).encode()
  70. pcrs = [16]
  71. try:
  72. self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
  73. self.client.policy_password(handle)
  74. policy_dig = self.client.get_policy_digest(handle)
  75. finally:
  76. self.client.flush_context(handle)
  77. blob = self.client.seal(self.root_key, data, auth, policy_dig)
  78. # Extend first a PCR that is not part of the policy and try to unseal.
  79. # This should succeed.
  80. ds = tpm2.get_digest_size(bank_alg)
  81. self.client.extend_pcr(1, ('X' * ds).encode(), bank_alg=bank_alg)
  82. handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
  83. try:
  84. self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
  85. self.client.policy_password(handle)
  86. result = self.client.unseal(self.root_key, blob, auth, handle)
  87. except:
  88. self.client.flush_context(handle)
  89. raise
  90. self.assertEqual(data, result)
  91. # Then, extend a PCR that is part of the policy and try to unseal.
  92. # This should fail.
  93. self.client.extend_pcr(16, ('X' * ds).encode(), bank_alg=bank_alg)
  94. handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
  95. rc = 0
  96. try:
  97. self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
  98. self.client.policy_password(handle)
  99. result = self.client.unseal(self.root_key, blob, auth, handle)
  100. except ProtocolError as e:
  101. rc = e.rc
  102. self.client.flush_context(handle)
  103. except:
  104. self.client.flush_context(handle)
  105. raise
  106. self.assertEqual(rc, tpm2.TPM2_RC_POLICY_FAIL)
  107. def test_seal_with_too_long_auth(self):
  108. ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1)
  109. data = ('X' * 64).encode()
  110. auth = ('A' * (ds + 1)).encode()
  111. rc = 0
  112. try:
  113. blob = self.client.seal(self.root_key, data, auth, None)
  114. except ProtocolError as e:
  115. rc = e.rc
  116. self.assertEqual(rc, tpm2.TPM2_RC_SIZE)
  117. def test_too_short_cmd(self):
  118. rejected = False
  119. try:
  120. fmt = '>HIII'
  121. cmd = struct.pack(fmt,
  122. tpm2.TPM2_ST_NO_SESSIONS,
  123. struct.calcsize(fmt) + 1,
  124. tpm2.TPM2_CC_FLUSH_CONTEXT,
  125. 0xDEADBEEF)
  126. self.client.send_cmd(cmd)
  127. except IOError as e:
  128. rejected = True
  129. except:
  130. pass
  131. self.assertEqual(rejected, True)
  132. def test_read_partial_resp(self):
  133. try:
  134. fmt = '>HIIH'
  135. cmd = struct.pack(fmt,
  136. tpm2.TPM2_ST_NO_SESSIONS,
  137. struct.calcsize(fmt),
  138. tpm2.TPM2_CC_GET_RANDOM,
  139. 0x20)
  140. self.client.tpm.write(cmd)
  141. hdr = self.client.tpm.read(10)
  142. sz = struct.unpack('>I', hdr[2:6])[0]
  143. rsp = self.client.tpm.read()
  144. except:
  145. pass
  146. self.assertEqual(sz, 10 + 2 + 32)
  147. self.assertEqual(len(rsp), 2 + 32)
  148. def test_read_partial_overwrite(self):
  149. try:
  150. fmt = '>HIIH'
  151. cmd = struct.pack(fmt,
  152. tpm2.TPM2_ST_NO_SESSIONS,
  153. struct.calcsize(fmt),
  154. tpm2.TPM2_CC_GET_RANDOM,
  155. 0x20)
  156. self.client.tpm.write(cmd)
  157. # Read part of the respone
  158. rsp1 = self.client.tpm.read(15)
  159. # Send a new cmd
  160. self.client.tpm.write(cmd)
  161. # Read the whole respone
  162. rsp2 = self.client.tpm.read()
  163. except:
  164. pass
  165. self.assertEqual(len(rsp1), 15)
  166. self.assertEqual(len(rsp2), 10 + 2 + 32)
  167. def test_send_two_cmds(self):
  168. rejected = False
  169. try:
  170. fmt = '>HIIH'
  171. cmd = struct.pack(fmt,
  172. tpm2.TPM2_ST_NO_SESSIONS,
  173. struct.calcsize(fmt),
  174. tpm2.TPM2_CC_GET_RANDOM,
  175. 0x20)
  176. self.client.tpm.write(cmd)
  177. # expect the second one to raise -EBUSY error
  178. self.client.tpm.write(cmd)
  179. rsp = self.client.tpm.read()
  180. except IOError as e:
  181. # read the response
  182. rsp = self.client.tpm.read()
  183. rejected = True
  184. pass
  185. except:
  186. pass
  187. self.assertEqual(rejected, True)
  188. class SpaceTest(unittest.TestCase):
  189. def setUp(self):
  190. logging.basicConfig(filename='SpaceTest.log', level=logging.DEBUG)
  191. def test_make_two_spaces(self):
  192. log = logging.getLogger(__name__)
  193. log.debug("test_make_two_spaces")
  194. space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
  195. root1 = space1.create_root_key()
  196. space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
  197. root2 = space2.create_root_key()
  198. root3 = space2.create_root_key()
  199. log.debug("%08x" % (root1))
  200. log.debug("%08x" % (root2))
  201. log.debug("%08x" % (root3))
  202. def test_flush_context(self):
  203. log = logging.getLogger(__name__)
  204. log.debug("test_flush_context")
  205. space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
  206. root1 = space1.create_root_key()
  207. log.debug("%08x" % (root1))
  208. space1.flush_context(root1)
  209. def test_get_handles(self):
  210. log = logging.getLogger(__name__)
  211. log.debug("test_get_handles")
  212. space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
  213. space1.create_root_key()
  214. space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
  215. space2.create_root_key()
  216. space2.create_root_key()
  217. handles = space2.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_TRANSIENT)
  218. self.assertEqual(len(handles), 2)
  219. log.debug("%08x" % (handles[0]))
  220. log.debug("%08x" % (handles[1]))
  221. def test_invalid_cc(self):
  222. log = logging.getLogger(__name__)
  223. log.debug(sys._getframe().f_code.co_name)
  224. TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 1
  225. space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
  226. root1 = space1.create_root_key()
  227. log.debug("%08x" % (root1))
  228. fmt = '>HII'
  229. cmd = struct.pack(fmt, tpm2.TPM2_ST_NO_SESSIONS, struct.calcsize(fmt),
  230. TPM2_CC_INVALID)
  231. rc = 0
  232. try:
  233. space1.send_cmd(cmd)
  234. except ProtocolError as e:
  235. rc = e.rc
  236. self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE |
  237. tpm2.TSS2_RESMGR_TPM_RC_LAYER)
  238. class AsyncTest(unittest.TestCase):
  239. def setUp(self):
  240. logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG)
  241. def test_async(self):
  242. log = logging.getLogger(__name__)
  243. log.debug(sys._getframe().f_code.co_name)
  244. async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK)
  245. log.debug("Calling get_cap in a NON_BLOCKING mode")
  246. async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION)
  247. async_client.close()
  248. def test_flush_invalid_context(self):
  249. log = logging.getLogger(__name__)
  250. log.debug(sys._getframe().f_code.co_name)
  251. async_client = tpm2.Client(tpm2.Client.FLAG_SPACE | tpm2.Client.FLAG_NONBLOCK)
  252. log.debug("Calling flush_context passing in an invalid handle ")
  253. handle = 0x80123456
  254. rc = 0
  255. try:
  256. async_client.flush_context(handle)
  257. except OSError as e:
  258. rc = e.errno
  259. self.assertEqual(rc, 22)
  260. async_client.close()