| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause)
- from argparse import ArgumentParser
- from argparse import FileType
- import os
- import sys
- import tpm2
- from tpm2 import ProtocolError
- import unittest
- import logging
- import struct
- class SmokeTest(unittest.TestCase):
- def setUp(self):
- self.client = tpm2.Client()
- self.root_key = self.client.create_root_key()
- def tearDown(self):
- self.client.flush_context(self.root_key)
- self.client.close()
- def test_seal_with_auth(self):
- data = ('X' * 64).encode()
- auth = ('A' * 15).encode()
- blob = self.client.seal(self.root_key, data, auth, None)
- result = self.client.unseal(self.root_key, blob, auth, None)
- self.assertEqual(data, result)
- def determine_bank_alg(self, mask):
- pcr_banks = self.client.get_cap_pcrs()
- for bank_alg, pcrSelection in pcr_banks.items():
- if pcrSelection & mask == mask:
- return bank_alg
- return None
- def test_seal_with_policy(self):
- bank_alg = self.determine_bank_alg(1 << 16)
- self.assertIsNotNone(bank_alg)
- handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
- data = ('X' * 64).encode()
- auth = ('A' * 15).encode()
- pcrs = [16]
- try:
- self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
- self.client.policy_password(handle)
- policy_dig = self.client.get_policy_digest(handle)
- finally:
- self.client.flush_context(handle)
- blob = self.client.seal(self.root_key, data, auth, policy_dig)
- handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
- try:
- self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
- self.client.policy_password(handle)
- result = self.client.unseal(self.root_key, blob, auth, handle)
- except:
- self.client.flush_context(handle)
- raise
- self.assertEqual(data, result)
- def test_unseal_with_wrong_auth(self):
- data = ('X' * 64).encode()
- auth = ('A' * 20).encode()
- rc = 0
- blob = self.client.seal(self.root_key, data, auth, None)
- try:
- result = self.client.unseal(self.root_key, blob,
- auth[:-1] + 'B'.encode(), None)
- except ProtocolError as e:
- rc = e.rc
- self.assertEqual(rc, tpm2.TPM2_RC_AUTH_FAIL)
- def test_unseal_with_wrong_policy(self):
- bank_alg = self.determine_bank_alg(1 << 16 | 1 << 1)
- self.assertIsNotNone(bank_alg)
- handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
- data = ('X' * 64).encode()
- auth = ('A' * 17).encode()
- pcrs = [16]
- try:
- self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
- self.client.policy_password(handle)
- policy_dig = self.client.get_policy_digest(handle)
- finally:
- self.client.flush_context(handle)
- blob = self.client.seal(self.root_key, data, auth, policy_dig)
- # Extend first a PCR that is not part of the policy and try to unseal.
- # This should succeed.
- ds = tpm2.get_digest_size(bank_alg)
- self.client.extend_pcr(1, ('X' * ds).encode(), bank_alg=bank_alg)
- handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
- try:
- self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
- self.client.policy_password(handle)
- result = self.client.unseal(self.root_key, blob, auth, handle)
- except:
- self.client.flush_context(handle)
- raise
- self.assertEqual(data, result)
- # Then, extend a PCR that is part of the policy and try to unseal.
- # This should fail.
- self.client.extend_pcr(16, ('X' * ds).encode(), bank_alg=bank_alg)
- handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
- rc = 0
- try:
- self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
- self.client.policy_password(handle)
- result = self.client.unseal(self.root_key, blob, auth, handle)
- except ProtocolError as e:
- rc = e.rc
- self.client.flush_context(handle)
- except:
- self.client.flush_context(handle)
- raise
- self.assertEqual(rc, tpm2.TPM2_RC_POLICY_FAIL)
- def test_seal_with_too_long_auth(self):
- ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1)
- data = ('X' * 64).encode()
- auth = ('A' * (ds + 1)).encode()
- rc = 0
- try:
- blob = self.client.seal(self.root_key, data, auth, None)
- except ProtocolError as e:
- rc = e.rc
- self.assertEqual(rc, tpm2.TPM2_RC_SIZE)
- def test_too_short_cmd(self):
- rejected = False
- try:
- fmt = '>HIII'
- cmd = struct.pack(fmt,
- tpm2.TPM2_ST_NO_SESSIONS,
- struct.calcsize(fmt) + 1,
- tpm2.TPM2_CC_FLUSH_CONTEXT,
- 0xDEADBEEF)
- self.client.send_cmd(cmd)
- except IOError as e:
- rejected = True
- except:
- pass
- self.assertEqual(rejected, True)
- def test_read_partial_resp(self):
- try:
- fmt = '>HIIH'
- cmd = struct.pack(fmt,
- tpm2.TPM2_ST_NO_SESSIONS,
- struct.calcsize(fmt),
- tpm2.TPM2_CC_GET_RANDOM,
- 0x20)
- self.client.tpm.write(cmd)
- hdr = self.client.tpm.read(10)
- sz = struct.unpack('>I', hdr[2:6])[0]
- rsp = self.client.tpm.read()
- except:
- pass
- self.assertEqual(sz, 10 + 2 + 32)
- self.assertEqual(len(rsp), 2 + 32)
- def test_read_partial_overwrite(self):
- try:
- fmt = '>HIIH'
- cmd = struct.pack(fmt,
- tpm2.TPM2_ST_NO_SESSIONS,
- struct.calcsize(fmt),
- tpm2.TPM2_CC_GET_RANDOM,
- 0x20)
- self.client.tpm.write(cmd)
- # Read part of the respone
- rsp1 = self.client.tpm.read(15)
- # Send a new cmd
- self.client.tpm.write(cmd)
- # Read the whole respone
- rsp2 = self.client.tpm.read()
- except:
- pass
- self.assertEqual(len(rsp1), 15)
- self.assertEqual(len(rsp2), 10 + 2 + 32)
- def test_send_two_cmds(self):
- rejected = False
- try:
- fmt = '>HIIH'
- cmd = struct.pack(fmt,
- tpm2.TPM2_ST_NO_SESSIONS,
- struct.calcsize(fmt),
- tpm2.TPM2_CC_GET_RANDOM,
- 0x20)
- self.client.tpm.write(cmd)
- # expect the second one to raise -EBUSY error
- self.client.tpm.write(cmd)
- rsp = self.client.tpm.read()
- except IOError as e:
- # read the response
- rsp = self.client.tpm.read()
- rejected = True
- pass
- except:
- pass
- self.assertEqual(rejected, True)
- class SpaceTest(unittest.TestCase):
- def setUp(self):
- logging.basicConfig(filename='SpaceTest.log', level=logging.DEBUG)
- def test_make_two_spaces(self):
- log = logging.getLogger(__name__)
- log.debug("test_make_two_spaces")
- space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
- root1 = space1.create_root_key()
- space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
- root2 = space2.create_root_key()
- root3 = space2.create_root_key()
- log.debug("%08x" % (root1))
- log.debug("%08x" % (root2))
- log.debug("%08x" % (root3))
- def test_flush_context(self):
- log = logging.getLogger(__name__)
- log.debug("test_flush_context")
- space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
- root1 = space1.create_root_key()
- log.debug("%08x" % (root1))
- space1.flush_context(root1)
- def test_get_handles(self):
- log = logging.getLogger(__name__)
- log.debug("test_get_handles")
- space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
- space1.create_root_key()
- space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
- space2.create_root_key()
- space2.create_root_key()
- handles = space2.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_TRANSIENT)
- self.assertEqual(len(handles), 2)
- log.debug("%08x" % (handles[0]))
- log.debug("%08x" % (handles[1]))
- def test_invalid_cc(self):
- log = logging.getLogger(__name__)
- log.debug(sys._getframe().f_code.co_name)
- TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 1
- space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
- root1 = space1.create_root_key()
- log.debug("%08x" % (root1))
- fmt = '>HII'
- cmd = struct.pack(fmt, tpm2.TPM2_ST_NO_SESSIONS, struct.calcsize(fmt),
- TPM2_CC_INVALID)
- rc = 0
- try:
- space1.send_cmd(cmd)
- except ProtocolError as e:
- rc = e.rc
- self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE |
- tpm2.TSS2_RESMGR_TPM_RC_LAYER)
- class AsyncTest(unittest.TestCase):
- def setUp(self):
- logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG)
- def test_async(self):
- log = logging.getLogger(__name__)
- log.debug(sys._getframe().f_code.co_name)
- async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK)
- log.debug("Calling get_cap in a NON_BLOCKING mode")
- async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION)
- async_client.close()
- def test_flush_invalid_context(self):
- log = logging.getLogger(__name__)
- log.debug(sys._getframe().f_code.co_name)
- async_client = tpm2.Client(tpm2.Client.FLAG_SPACE | tpm2.Client.FLAG_NONBLOCK)
- log.debug("Calling flush_context passing in an invalid handle ")
- handle = 0x80123456
- rc = 0
- try:
- async_client.flush_context(handle)
- except OSError as e:
- rc = e.errno
- self.assertEqual(rc, 22)
- async_client.close()
|