Skip to content
Merged
168 changes: 111 additions & 57 deletions comtypes/test/test_storage.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import contextlib
import tempfile
import unittest
from _ctypes import COMError
from ctypes import HRESULT, POINTER, OleDLL, byref, c_ubyte
from ctypes.wintypes import DWORD, PWCHAR
from pathlib import Path
from typing import Optional

import comtypes
import comtypes.client

with contextlib.redirect_stdout(None): # supress warnings
mod = comtypes.client.GetModule("msvidctl.dll")
comtypes.client.GetModule("portabledeviceapi.dll")
from comtypes.gen.PortableDeviceApiLib import IStorage, tagSTATSTG

from comtypes.gen.MSVidCtlLib import IStorage
STGTY_STORAGE = 1

STATFLAG_DEFAULT = 0
STGC_DEFAULT = 0
Expand All @@ -26,7 +27,7 @@
STREAM_SEEK_SET = 0

STG_E_PATHNOTFOUND = -2147287038

STG_E_INVALIDFLAG = -2147286785

_ole32 = OleDLL("ole32")

Expand All @@ -36,28 +37,25 @@


class Test_IStorage(unittest.TestCase):
CREATE_DOC_FLAG = (
STGM_DIRECT
| STGM_READWRITE
| STGM_CREATE
| STGM_SHARE_EXCLUSIVE
| STGM_DELETEONRELEASE
)
CREATE_STM_FLAG = STGM_CREATE | STGM_READWRITE | STGM_SHARE_EXCLUSIVE
OPEN_STM_FLAG = STGM_READ | STGM_SHARE_EXCLUSIVE
CREATE_STG_FLAG = STGM_TRANSACTED | STGM_READWRITE | STGM_SHARE_EXCLUSIVE
OPEN_STG_FLAG = STGM_TRANSACTED | STGM_READWRITE | STGM_SHARE_EXCLUSIVE

def _create_docfile(self) -> IStorage:
RW_EXCLUSIVE = STGM_READWRITE | STGM_SHARE_EXCLUSIVE
RW_EXCLUSIVE_TX = RW_EXCLUSIVE | STGM_TRANSACTED
RW_EXCLUSIVE_CREATE = RW_EXCLUSIVE | STGM_CREATE
CREATE_TESTDOC = STGM_DIRECT | STGM_CREATE | RW_EXCLUSIVE
CREATE_TEMP_TESTDOC = CREATE_TESTDOC | STGM_DELETEONRELEASE

def _create_docfile(self, mode: int, name: Optional[str] = None) -> IStorage:
stg = POINTER(IStorage)()
_StgCreateDocfile(None, self.CREATE_DOC_FLAG, 0, byref(stg))
_StgCreateDocfile(name, mode, 0, byref(stg))
return stg # type: ignore

def test_CreateStream(self):
storage = self._create_docfile()
storage = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
# When created with `StgCreateDocfile(NULL, ...)`, `pwcsName` is a
# temporary filename. The file really exists on disk because Windows
# creates an actual temporary file for the compound storage.
filepath = Path(storage.Stat(STATFLAG_DEFAULT).pwcsName)
self.assertTrue(filepath.exists())
stream = storage.CreateStream("example", self.CREATE_STM_FLAG, 0, 0)
stream = storage.CreateStream("example", self.RW_EXCLUSIVE_CREATE, 0, 0)
test_data = b"Some data"
pv = (c_ubyte * len(test_data)).from_buffer(bytearray(test_data))
stream.RemoteWrite(pv, len(test_data))
Expand All @@ -70,64 +68,120 @@ def test_CreateStream(self):
del storage
self.assertFalse(filepath.exists())

# TODO: Auto-generated methods based on type info are remote-side and hard
# to call from the client.
# If a proper invocation method or workaround is found, testing
# becomes possible.
# See: https://github.com/enthought/comtypes/issues/607
# def test_RemoteOpenStream(self):
# pass

def test_CreateStorage(self):
parent = self._create_docfile()
child = parent.CreateStorage("child", self.CREATE_STG_FLAG, 0, 0)
parent = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
child = parent.CreateStorage("child", self.RW_EXCLUSIVE_TX, 0, 0)
self.assertEqual("child", child.Stat(STATFLAG_DEFAULT).pwcsName)

def test_OpenStorage(self):
parent = self._create_docfile()
created_child = parent.CreateStorage("child", self.CREATE_STG_FLAG, 0, 0)
del created_child
opened_child = parent.OpenStorage("child", None, self.OPEN_STG_FLAG, None, 0)
self.assertEqual("child", opened_child.Stat(STATFLAG_DEFAULT).pwcsName)
parent = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
with self.assertRaises(COMError) as cm:
parent.OpenStorage("child", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual(cm.exception.hresult, STG_E_PATHNOTFOUND)
parent.CreateStorage("child", self.RW_EXCLUSIVE_TX, 0, 0)
child = parent.OpenStorage("child", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual("child", child.Stat(STATFLAG_DEFAULT).pwcsName)

def test_RemoteCopyTo(self):
src_stg = self._create_docfile()
src_stg.CreateStorage("child", self.CREATE_STG_FLAG, 0, 0)
dst_stg = self._create_docfile()
src_stg = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
src_stg.CreateStorage("child", self.RW_EXCLUSIVE_TX, 0, 0)
dst_stg = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
src_stg.RemoteCopyTo(0, None, None, dst_stg)
src_stg.Commit(STGC_DEFAULT)
del src_stg
opened_stg = dst_stg.OpenStorage("child", None, self.OPEN_STG_FLAG, None, 0)
opened_stg = dst_stg.OpenStorage("child", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual("child", opened_stg.Stat(STATFLAG_DEFAULT).pwcsName)

def test_MoveElementTo(self):
src_stg = self._create_docfile()
src_stg.CreateStorage("foo", self.CREATE_STG_FLAG, 0, 0)
dst_stg = self._create_docfile()
src_stg = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
src_stg.CreateStorage("foo", self.RW_EXCLUSIVE_TX, 0, 0)
dst_stg = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
src_stg.MoveElementTo("foo", dst_stg, "bar", STGMOVE_MOVE)
opened_stg = dst_stg.OpenStorage("bar", None, self.OPEN_STG_FLAG, None, 0)
opened_stg = dst_stg.OpenStorage("bar", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual("bar", opened_stg.Stat(STATFLAG_DEFAULT).pwcsName)
with self.assertRaises(COMError) as ctx:
src_stg.OpenStorage("foo", None, self.OPEN_STG_FLAG, None, 0)
self.assertEqual(ctx.exception.hresult, STG_E_PATHNOTFOUND)
with self.assertRaises(COMError) as cm:
src_stg.OpenStorage("foo", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual(cm.exception.hresult, STG_E_PATHNOTFOUND)

def test_Revert(self):
storage = self._create_docfile()
foo = storage.CreateStorage("foo", self.CREATE_STG_FLAG, 0, 0)
foo.CreateStorage("bar", self.CREATE_STG_FLAG, 0, 0)
bar = foo.OpenStorage("bar", None, self.OPEN_STG_FLAG, None, 0)
storage = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
foo = storage.CreateStorage("foo", self.RW_EXCLUSIVE_TX, 0, 0)
foo.CreateStorage("bar", self.RW_EXCLUSIVE_TX, 0, 0)
bar = foo.OpenStorage("bar", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual("bar", bar.Stat(STATFLAG_DEFAULT).pwcsName)
foo.Revert()
with self.assertRaises(COMError) as ctx:
foo.OpenStorage("bar", None, self.OPEN_STG_FLAG, None, 0)
self.assertEqual(ctx.exception.hresult, STG_E_PATHNOTFOUND)
with self.assertRaises(COMError) as cm:
foo.OpenStorage("bar", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual(cm.exception.hresult, STG_E_PATHNOTFOUND)

# TODO: Auto-generated methods based on type info are remote-side and hard
# to call from the client.
# If a proper invocation method or workaround is found, testing
# becomes possible.
# See: https://github.com/enthought/comtypes/issues/607
# def test_RemoteEnumElements(self):
# pass

def test_DestroyElement(self):
storage = self._create_docfile()
storage.CreateStorage("example", self.CREATE_STG_FLAG, 0, 0)
storage = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
storage.CreateStorage("example", self.RW_EXCLUSIVE_TX, 0, 0)
storage.DestroyElement("example")
with self.assertRaises(COMError) as ctx:
storage.OpenStorage("example", None, self.OPEN_STG_FLAG, None, 0)
self.assertEqual(ctx.exception.hresult, STG_E_PATHNOTFOUND)
with self.assertRaises(COMError) as cm:
storage.OpenStorage("example", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual(cm.exception.hresult, STG_E_PATHNOTFOUND)

def test_RenameElement(self):
storage = self._create_docfile()
storage.CreateStorage("example", self.CREATE_STG_FLAG, 0, 0)
storage = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
storage.CreateStorage("example", self.RW_EXCLUSIVE_TX, 0, 0)
storage.RenameElement("example", "sample")
sample = storage.OpenStorage("sample", None, self.OPEN_STG_FLAG, None, 0)
sample = storage.OpenStorage("sample", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual("sample", sample.Stat(STATFLAG_DEFAULT).pwcsName)
with self.assertRaises(COMError) as ctx:
storage.OpenStorage("example", None, self.OPEN_STG_FLAG, None, 0)
self.assertEqual(ctx.exception.hresult, STG_E_PATHNOTFOUND)
with self.assertRaises(COMError) as cm:
storage.OpenStorage("example", None, self.RW_EXCLUSIVE_TX, None, 0)
self.assertEqual(cm.exception.hresult, STG_E_PATHNOTFOUND)

def test_SetClass(self):
storage = self._create_docfile(mode=self.CREATE_TEMP_TESTDOC)
# Initial value is CLSID_NULL.
self.assertEqual(storage.Stat(STATFLAG_DEFAULT).clsid, comtypes.GUID())
new_clsid = comtypes.GUID.create_new()
storage.SetClass(new_clsid)
self.assertEqual(storage.Stat(STATFLAG_DEFAULT).clsid, new_clsid)
# Re-set CLSID to CLSID_NULL and verify it is correctly set.
storage.SetClass(comtypes.GUID())
self.assertEqual(storage.Stat(STATFLAG_DEFAULT).clsid, comtypes.GUID())

def test_Stat(self):
with tempfile.TemporaryDirectory() as t:
tmpdir = Path(t)
tmpfile = tmpdir / "test_docfile.cfs"
self.assertFalse(tmpfile.exists())
# When created with `StgCreateDocfile(filepath_string, ...)`, the
# compound file is created at that location.
storage = self._create_docfile(
name=str(tmpfile), mode=self.CREATE_TEMP_TESTDOC
)
self.assertTrue(tmpfile.exists())
with self.assertRaises(COMError) as cm:
storage.Stat(0xFFFFFFFF) # Invalid flag
self.assertEqual(cm.exception.hresult, STG_E_INVALIDFLAG)
stat = storage.Stat(STATFLAG_DEFAULT)
self.assertIsInstance(stat, tagSTATSTG)
del storage # Release the storage to prevent 'cannot access the file ...'
self.assertEqual(stat.type, STGTY_STORAGE)
# Due to header overhead and file system allocation, the size may be
# greater than 0 bytes.
self.assertGreaterEqual(stat.cbSize, 0)
# `grfMode` should reflect the access mode flags from creation.
self.assertEqual(stat.grfMode, self.RW_EXCLUSIVE | STGM_DIRECT)
self.assertEqual(stat.grfLocksSupported, 0)
self.assertEqual(stat.clsid, comtypes.GUID()) # CLSID_NULL for new creation.
self.assertEqual(stat.grfStateBits, 0)
Loading