• Re: Futex Stack Test...

    From Paavo Helde@21:1/5 to Chris M. Thomasson on Tue Feb 18 09:01:55 2025
    On 18.02.2025 01:17, Chris M. Thomasson wrote:
    This is a little C++20 test using a futex to allow one to wait on a
    lock-free stack. The main stack logic is in struct ct_stack. Well, can
    you get to compile and run? Thanks...

    Seamed to work fine, but compilation produced some warnings about the
    CT_WAIT macro.


    Build started at 08:55...
    ------ Build started: Project: ConsoleTest2022, Configuration: Release
    x64 ------
    futex-stack-test.cpp 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(55,25): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(71,21): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(87,38): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(90,40): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(92,39): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(94,33): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(185,38): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size
    Generating code
    77 of 78 functions (98.7%) were compiled, the rest were copied from
    previous compilation.
    66 functions were new in current compilation
    0 functions had inline decision re-evaluated but remain unchanged
    Finished generating code
    ConsoleTest2022.vcxproj -> C:\Test\ConsoleTestVS2022\ConsoleTest2022\x64\Release\ConsoleTest2022.exe 1>Done building project "ConsoleTest2022.vcxproj".
    ========== Build: 1 succeeded, 0 failed, 0 up-to-date, 0 skipped ==========


    Futex Stack Test
    by: Chris M. Thomasson
    version: (0.0.1)
    _________________________________

    CT_THREAD_N = 42
    CT_WORK_N = 10000000
    CT_WAIT = 00000000DEADBEEF
    ct_shared::ct_shared()


    Launching threads...


    Generate work...


    Completed all work!

    Sending shutdown state...


    Threads joined...

    Dump shutdown state...


    Shutdown complete!

    ct_shared::~ct_shared()

    g_ct_work_alloations = 10000042
    g_ct_work_dealloations = 10000042

    Fin!



    I still need to check my algorithm out in Relacy. I think it might be
    able to model futex.


    My code:
    ______________________________________
    #include <iostream>
    #include <thread>
    #include <atomic>
    #include <algorithm>
    #include <cassert>


    #define CT_THREAD_N (42)
    #define CT_WORK_N (10000000)
    #define CT_WAIT ((ct_node*)0xDEADBEEF)


    struct ct_node
    {
        ct_node* m_next = { nullptr };

        ct_node()
        {
            //std::cout << "(" << this << ")::ct_node::ct_node()\n";
        }

        ~ct_node()
        {
            //std::cout << "(" << this << ")::ct_node::~ct_node()\n";
        }
    };


    struct ct_stack
    {
        std::atomic<ct_node*> m_head = { nullptr };

        void
        push(
            ct_node* node
        ) {
            ct_node* head = m_head.load(std::memory_order_relaxed);

            do
            {
                if (head == CT_WAIT)
                {
                    node->m_next = nullptr;
                }

                else
                {
                    node->m_next = head;
                }

            } while (! m_head.compare_exchange_weak(
                head,
                node,
                std::memory_order_release)
              );

            if (head == CT_WAIT)
            {
                // slow path...
                m_head.notify_one();
            }
        }

        ct_node*
        flush_wait()
        {
            ct_node* head = nullptr;

            for (;;)
            {
                head = m_head.exchange(nullptr, std::memory_order_acquire);

                while (! head || head == CT_WAIT)
                {
                    // slow path...
                    head = m_head.exchange(CT_WAIT, std::memory_order_acquire);

                    if (! head || head == CT_WAIT)
                    {
                        m_head.wait(CT_WAIT, std::memory_order_relaxed);
                        continue;
                    }
                }

                break;
            }

            assert(head && head != CT_WAIT);

            return head;
        }
    };


    struct ct_work : public ct_node
    {
        unsigned long m_state = { 0 };
    };


    struct ct_shared
    {
        ct_stack m_stack_in;
        ct_stack m_stack_out;

        ct_shared()
        {
            std::cout << "ct_shared::ct_shared()\n" << std::endl;
        }

        ~ct_shared()
        {
            assert(! m_stack_in.m_head || m_stack_in.m_head == CT_WAIT);
            assert(! m_stack_out.m_head || m_stack_out.m_head == CT_WAIT);

            std::cout << "ct_shared::~ct_shared()\n" << std::endl;
        }
    };


    void
    ct_thread(
        ct_shared& shared
    ) {
        unsigned long state = 0;

        while (! state)
        {
            ct_work* head = (ct_work*)shared.m_stack_in.flush_wait();

            while (head)
            {
                ct_work* next = (ct_work*)head->m_next;

                if (head->m_state == 666)
                {
                    // Shutdown detected...
                    state = 1;
                    shared.m_stack_in.push(head);
                }

                else
                {
                    shared.m_stack_out.push(head);
                }

                head = next;
            }
        }

        //std::cout << "shutdown..." << std::endl;
    }


    int
    main()
    {
        {
            std::cout << "Futex Stack Test\n";
            std::cout << "by: Chris M. Thomasson\n";
            std::cout << "version: (0.0.1)\n";
            std::cout << "_________________________________\n" << std::endl;
        }

        unsigned long g_ct_work_alloations = 0;
        unsigned long g_ct_work_dealloations = 0;

        {
            std::cout << "CT_THREAD_N = " << CT_THREAD_N << std::endl;
            std::cout << "CT_WORK_N = " << CT_WORK_N << std::endl;
            std::cout << "CT_WAIT = " << CT_WAIT << std::endl;
        }

        {
            ct_shared shared = { };

            std::thread threads[CT_THREAD_N];

            std::cout << "\nLaunching threads...\n" << std::endl;

            for (unsigned long i = 0; i < CT_THREAD_N; ++i)
            {
                threads[i] = std::thread(ct_thread, std::ref(shared));
            }

            //std::this_thread::sleep_for(std::chrono::seconds(2));

            std::cout << "\nGenerate work...\n" << std::endl;

            // Generate work...
            {
                for (unsigned long i = 0; i < CT_WORK_N; ++i)
                {
                    shared.m_stack_in.push(new ct_work);
                    ++g_ct_work_alloations;
                }
            }

            // Wait for work...
            {
                unsigned long wn = 0;

                while (wn < CT_WORK_N)
                {
                    ct_work* head = (ct_work*)shared.m_stack_out.flush_wait();

                    while (head)
                    {
                        ct_work* next = (ct_work*)head->m_next;
                        delete head;
                        ++g_ct_work_dealloations;
                        ++wn;
                        head = next;
                    }
                }
            }

            std::cout << "\nCompleted all work!\n" << std::endl;
            std::cout << "Sending shutdown state...\n" << std::endl;

            // Send shutdown state...
            {
                for (unsigned long i = 0; i < CT_THREAD_N; ++i)
                {
                    ct_work* w = new ct_work;
                    ++g_ct_work_alloations;
                    w->m_state = 666;
                    shared.m_stack_in.push(w);
                }
            }

            // Join threads...
            {
                for (unsigned long i = 0; i < CT_THREAD_N; ++i)
                {
                    threads[i].join();
                }
            }

            std::cout << "\nThreads joined...\n" << std::endl;

            // Dump shutdown state...
            std::cout << "Dump shutdown state...\n" << std::endl;

            {
                ct_work* head = (ct_work*)shared.m_stack_in.m_head.load(std::memory_order_relaxed);
                shared.m_stack_in.m_head = nullptr;

                while (head)
                {
                    ct_work* next = (ct_work*)head->m_next;
                    delete head;
                    ++g_ct_work_dealloations;
                    head = next;
                }
            }

            std::cout << "\nShutdown complete!\n" << std::endl;
        }

        // Sanity check...
        {
            std::cout << "g_ct_work_alloations = " << g_ct_work_alloations
    << "\n";
            std::cout << "g_ct_work_dealloations = " << g_ct_work_dealloations << std::endl;

            if (g_ct_work_alloations != g_ct_work_dealloations)
            {
                std::cout << "Pardon my French, but shit!!!!!!!!!!!!!! NOT
    GOOD\n" << std::endl;
                std::cout << "DAMN IT!!!! Grrrrr\n" << std::endl;
            }
        }

        std::cout << "\nFin!\n" << std::endl;

        return 0;
    }
    ______________________________________




    My output:
    _______________________
    Futex Stack Test
    by: Chris M. Thomasson
    version: (0.0.1)
    _________________________________

    CT_THREAD_N = 42
    CT_WORK_N = 10000000
    CT_WAIT = 00000000DEADBEEF
    ct_shared::ct_shared()


    Launching threads...


    Generate work...


    Completed all work!

    Sending shutdown state...


    Threads joined...

    Dump shutdown state...


    Shutdown complete!

    ct_shared::~ct_shared()

    g_ct_work_alloations = 10000042
    g_ct_work_dealloations = 10000042

    Fin!
    _______________________


    Well, any luck?

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From James Kuyper@21:1/5 to Paavo Helde on Tue Feb 18 14:13:53 2025
    On 2/18/25 02:01, Paavo Helde wrote:
    On 18.02.2025 01:17, Chris M. Thomasson wrote:
    ...
    Seamed to work fine, but compilation produced some warnings about the
    CT_WAIT macro.


    Build started at 08:55...
    ------ Build started: Project: ConsoleTest2022, Configuration: Release
    x64 ------
    futex-stack-test.cpp 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(55,25): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(71,21): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(87,38): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(90,40): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(92,39): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(94,33): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size 1>C:\Test\ConsoleTestVS2022\ConsoleTest2022\futex-stack-test.cpp(185,38): warning C4312: 'type cast': conversion from 'unsigned int' to 'ct_node
    *' of greater size
    ...
    #define CT_WAIT ((ct_node*)0xDEADBEEF)

    That should be a reinterpret_cast<>. Correcting that gets rid of all of
    those messages.

    Also, the following expressions should all use static_cast<>:
            ct_work* head = (ct_work*)shared.m_stack_in.flush_wait(); ...
                ct_work* next = (ct_work*)head->m_next;
    ...
                    ct_work* head = (ct_work*)shared.m_stack_out.flush_wait();
    ...
                        ct_work* next = (ct_work*)head->m_next;
    ...
                ct_work* head =
    (ct_work*)shared.m_stack_in.m_head.load(std::memory_order_relaxed);
    ...
                    ct_work* next = (ct_work*)head->m_next

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From Wuns Haerst@21:1/5 to All on Fri May 2 13:01:49 2025
    It's not hard to implement a lock-free stack with DW-CAS,
    so no need for futexes. That's my dcas_atomic:

    #pragma once
    #include <cstdint>
    #include <utility>
    #include <atomic>
    #include <type_traits>
    #include <climits>
    #include <bit>
    #include <memory>

    #if defined(_MSC_VER)
    #pragma warning(push)
    #pragma warning(disable: 26495)
    #endif
    #if defined(__llvm__)
    #pragma clang diagnostic push
    #pragma clang diagnostic ignored "-Watomic-alignment"
    #endif

    constexpr int
    DCAS_ACQ_REL = (int)std::memory_order::acq_rel,
    DCAS_ACQUIRE = (int)std::memory_order::acquire,
    DCAS_RELAXED = (int)std::memory_order::relaxed,
    DCAS_RELEASE = (int)std::memory_order::release,
    DCAS_SEQ_CST = (int)std::memory_order::seq_cst;
    template<int Type>
    using dcas_type = std::integral_constant<int, Type>;
    using dcas_acq_rel = dcas_type<DCAS_ACQ_REL>;
    using dcas_acquire = dcas_type<DCAS_ACQUIRE>;
    using dcas_relaxed = dcas_type<DCAS_RELAXED>;
    using dcas_release = dcas_type<DCAS_RELEASE>;
    using dcas_seq_cst = dcas_type<DCAS_SEQ_CST>;

    struct dcas_atomic
    {
    static_assert(sizeof(size_t) == 8 || sizeof(size_t) == 4, "must be 64 or 32 bit architecture");
    struct pair_t { size_t first, second; };
    dcas_atomic() = default;
    dcas_atomic( pair_t const &desired );
    std::atomic<size_t> &first() noexcept;
    std::atomic<size_t> &second() noexcept;
    operator pair_t() noexcept;
    template<int Type>
    bool compare_exchange( pair_t &expected, pair_t const &desired, dcas_type<Type> = dcas_seq_cst() ) noexcept;
    template<int Type>
    pair_t load( dcas_type<Type> = dcas_seq_cst() ) const noexcept;
    template<int Type>
    void store( pair_t const &niu, dcas_type<Type> = dcas_seq_cst() ) noexcept;
    private:
    #if defined(__GNUC__) || defined(__clang__) && !defined(_MSC_VER)
    #if SIZE_WIDTH == 64
    using dword_t = unsigned __int128;
    #elif SIZE_WIDTH == 32
    using dword_t = unsigned long long;
    #else
    #error unknown architecture
    #endif
    #endif
    union alignas(2 * sizeof(size_t)) U
    {
    U() {}
    static_assert(sizeof(std::atomic<size_t>) == sizeof(size_t), "sizeof(atomic<size_t>) must be == sizeof(size_t)");
    std::atomic<size_t> m_atomics[2];
    #if defined(_MSC_VER)
    #if defined(_M_X64) || defined(_M_ARM64) || defined(_M_ARM64EC)
    __int64 volatile m_firstAndSecond[2];
    #elif defined(_M_IX86)
    __int64 volatile m_firstAndSecond;
    #else
    #error unknown architecture
    #endif
    #elif defined(__GNUC__) || defined(__clang__)
    dword_t volatile m_firstAndSecond;
    #endif
    } u;
    };

    inline dcas_atomic::dcas_atomic( pair_t const &desired )
    {
    u.m_atomics[0].store( desired.first, std::memory_order_relaxed );
    u.m_atomics[1].store( desired.second, std::memory_order_relaxed );
    }

    inline std::atomic<size_t> &dcas_atomic::first() noexcept
    {
    return u.m_atomics[0];
    }

    inline std::atomic<size_t> &dcas_atomic::second() noexcept
    {
    return u.m_atomics[1];
    }

    inline dcas_atomic::operator pair_t() noexcept
    {
    return pair_t( first().load( std::memory_order_relaxed ), second().load( std::memory_order_relaxed ) );
    }

    template<int Type>
    inline bool dcas_atomic::compare_exchange( pair_t &expected, pair_t
    const &desired, dcas_type<Type> ) noexcept
    {
    using namespace std;
    static_assert(Type == DCAS_ACQ_REL || Type == DCAS_ACQUIRE || Type == DCAS_RELAXED || Type == DCAS_RELEASE || Type == DCAS_SEQ_CST);
    #if defined(_MSC_VER)
    #if defined(_M_X64) || defined(_M_ARM64) || defined(_M_ARM64EC)
    __int64 expectedA[2];
    expectedA[0] = (__int64)expected.first;
    expectedA[1] = (__int64)expected.second;
    char fRet;
    #if defined(_M_X64)
    fRet = _InterlockedCompareExchange128( u.m_firstAndSecond, desired.second, desired.first, expectedA );
    #else
    if constexpr( Type == DCAS_ACQ_REL || Type == DCAS_SEQ_CST )
    fRet = _InterlockedCompareExchange128( u.m_firstAndSecond, desired.second, desired.first, expectedA );
    else if constexpr( Type == DCAS_ACQUIRE )
    fRet = _InterlockedCompareExchange128_acq( u.m_firstAndSecond, desired.second, desired.first, expectedA );
    else if constexpr( Type == DCAS_RELAXED )
    fRet = _InterlockedCompareExchange128_nf( u.m_firstAndSecond, desired.second, desired.first, expectedA );
    else
    fRet = _InterlockedCompareExchange128_rel( u.m_firstAndSecond, desired.second, desired.first, expectedA );
    #endif
    if( fRet )
    return true;
    expected.first = expectedA[0];
    expected.second = expectedA[1];
    return false;
    #elif defined(_M_IX86)
    auto compose = []( pair_t const &p ) -> uint64_t { return p.first | (uint64_t)p.second << 32; };
    uint64_t
    cDesired = compose( desired ),
    cExpected = compose( expected ),
    cResult = _InterlockedCompareExchange64( &u.m_firstAndSecond, cDesired, cExpected );
    if( cResult == cExpected ) [[likely]]
    return true;
    expected.first = (uint32_t)cResult;
    expected.second = (uint32_t)(cResult >> 32);
    return false;
    #else
    #error unspupported Windows-platform
    #endif
    #elif defined(__GNUC__) || defined(__clang__)
    constexpr auto
    pair_t::*FIRST = std::endian::native == std::endian::little ? &pair_t::first : &pair_t::second,
    pair_t::*SECOND = std::endian::native == std::endian::little ? &pair_t::second : &pair_t::first;
    auto compose = []( pair_t const &p ) -> dword_t { return (dword_t)(p.*FIRST) | (dword_t)(p.*SECOND) << SIZE_WIDTH; };
    dword_t
    dwExpected = compose( expected ),
    dwDesired = compose( desired );
    bool ret;
    if constexpr( Type == DCAS_ACQ_REL )
    ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected,
    dwDesired, true, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED );
    else if constexpr( Type == DCAS_ACQUIRE )
    ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected,
    dwDesired, true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED );
    else if constexpr( Type == DCAS_RELAXED )
    ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected,
    dwDesired, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED );
    else if constexpr( Type == DCAS_RELEASE )
    ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected,
    dwDesired, true, __ATOMIC_RELEASE, __ATOMIC_RELAXED );
    else
    ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected,
    dwDesired, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED );
    if( ret ) [[likely]]
    return true;
    expected.*FIRST = (size_t)dwExpected;
    expected.*SECOND = (size_t)(dwExpected >> SIZE_WIDTH);
    return false;
    #else
    #error unspupported platform
    #endif
    }

    template<int Type>
    inline typename dcas_atomic::pair_t dcas_atomic::load( dcas_type<Type> )
    const noexcept
    {
    using namespace std;
    static_assert(Type == DCAS_ACQ_REL || Type == DCAS_ACQUIRE || Type == DCAS_RELAXED || Type == DCAS_RELEASE || Type == DCAS_SEQ_CST);
    pair_t cmp( 0, 0 );
    const_cast<dcas_atomic *>(this)->compare_exchange( cmp, cmp, dcas_type<Type>() );
    return cmp;
    }

    template<int Type>
    inline void dcas_atomic::store( pair_t const &niu, dcas_type<Type> )
    noexcept
    {
    using namespace std;
    static_assert(Type == DCAS_ACQ_REL || Type == DCAS_ACQUIRE || Type == DCAS_RELAXED || Type == DCAS_RELEASE || Type == DCAS_SEQ_CST);
    pair_t cmp( *this );
    while( !this->compare_exchange( cmp, niu, dcas_type<Type>() ) );
    }

    #if defined(_MSC_VER)
    #pragma warning(pop)
    #endif
    #if defined(__llvm__)
    #pragma clang diagnostic pop
    #endif

    Am 18.02.2025 um 00:17 schrieb Chris M. Thomasson:
    This is a little C++20 test using a futex to allow one to wait on a
    lock-free stack. The main stack logic is in struct ct_stack. Well, can
    you get to compile and run? Thanks...

    I still need to check my algorithm out in Relacy. I think it might be
    able to model futex.


    My code:
    ______________________________________
    #include <iostream>
    #include <thread>
    #include <atomic>
    #include <algorithm>
    #include <cassert>


    #define CT_THREAD_N (42)
    #define CT_WORK_N (10000000)
    #define CT_WAIT ((ct_node*)0xDEADBEEF)


    struct ct_node
    {
        ct_node* m_next = { nullptr };

        ct_node()
        {
            //std::cout << "(" << this << ")::ct_node::ct_node()\n";
        }

        ~ct_node()
        {
            //std::cout << "(" << this << ")::ct_node::~ct_node()\n";
        }
    };


    struct ct_stack
    {
        std::atomic<ct_node*> m_head = { nullptr };

        void
        push(
            ct_node* node
        ) {
            ct_node* head = m_head.load(std::memory_order_relaxed);

            do
            {
                if (head == CT_WAIT)
                {
                    node->m_next = nullptr;
                }

                else
                {
                    node->m_next = head;
                }

            } while (! m_head.compare_exchange_weak(
                head,
                node,
                std::memory_order_release)
              );

            if (head == CT_WAIT)
            {
                // slow path...
                m_head.notify_one();
            }
        }

        ct_node*
        flush_wait()
        {
            ct_node* head = nullptr;

            for (;;)
            {
                head = m_head.exchange(nullptr, std::memory_order_acquire);

                while (! head || head == CT_WAIT)
                {
                    // slow path...
                    head = m_head.exchange(CT_WAIT, std::memory_order_acquire);

                    if (! head || head == CT_WAIT)
                    {
                        m_head.wait(CT_WAIT, std::memory_order_relaxed);
                        continue;
                    }
                }

                break;
            }

            assert(head && head != CT_WAIT);

            return head;
        }
    };


    struct ct_work : public ct_node
    {
        unsigned long m_state = { 0 };
    };


    struct ct_shared
    {
        ct_stack m_stack_in;
        ct_stack m_stack_out;

        ct_shared()
        {
            std::cout << "ct_shared::ct_shared()\n" << std::endl;
        }

        ~ct_shared()
        {
            assert(! m_stack_in.m_head || m_stack_in.m_head == CT_WAIT);
            assert(! m_stack_out.m_head || m_stack_out.m_head == CT_WAIT);

            std::cout << "ct_shared::~ct_shared()\n" << std::endl;
        }
    };


    void
    ct_thread(
        ct_shared& shared
    ) {
        unsigned long state = 0;

        while (! state)
        {
            ct_work* head = (ct_work*)shared.m_stack_in.flush_wait();

            while (head)
            {
                ct_work* next = (ct_work*)head->m_next;

                if (head->m_state == 666)
                {
                    // Shutdown detected...
                    state = 1;
                    shared.m_stack_in.push(head);
                }

                else
                {
                    shared.m_stack_out.push(head);
                }

                head = next;
            }
        }

        //std::cout << "shutdown..." << std::endl;
    }


    int
    main()
    {
        {
            std::cout << "Futex Stack Test\n";
            std::cout << "by: Chris M. Thomasson\n";
            std::cout << "version: (0.0.1)\n";
            std::cout << "_________________________________\n" << std::endl;
        }

        unsigned long g_ct_work_alloations = 0;
        unsigned long g_ct_work_dealloations = 0;

        {
            std::cout << "CT_THREAD_N = " << CT_THREAD_N << std::endl;
            std::cout << "CT_WORK_N = " << CT_WORK_N << std::endl;
            std::cout << "CT_WAIT = " << CT_WAIT << std::endl;
        }

        {
            ct_shared shared = { };

            std::thread threads[CT_THREAD_N];

            std::cout << "\nLaunching threads...\n" << std::endl;

            for (unsigned long i = 0; i < CT_THREAD_N; ++i)
            {
                threads[i] = std::thread(ct_thread, std::ref(shared));
            }

            //std::this_thread::sleep_for(std::chrono::seconds(2));

            std::cout << "\nGenerate work...\n" << std::endl;

            // Generate work...
            {
                for (unsigned long i = 0; i < CT_WORK_N; ++i)
                {
                    shared.m_stack_in.push(new ct_work);
                    ++g_ct_work_alloations;
                }
            }

            // Wait for work...
            {
                unsigned long wn = 0;

                while (wn < CT_WORK_N)
                {
                    ct_work* head = (ct_work*)shared.m_stack_out.flush_wait();

                    while (head)
                    {
                        ct_work* next = (ct_work*)head->m_next;
                        delete head;
                        ++g_ct_work_dealloations;
                        ++wn;
                        head = next;
                    }
                }
            }

            std::cout << "\nCompleted all work!\n" << std::endl;
            std::cout << "Sending shutdown state...\n" << std::endl;

            // Send shutdown state...
            {
                for (unsigned long i = 0; i < CT_THREAD_N; ++i)
                {
                    ct_work* w = new ct_work;
                    ++g_ct_work_alloations;
                    w->m_state = 666;
                    shared.m_stack_in.push(w);
                }
            }

            // Join threads...
            {
                for (unsigned long i = 0; i < CT_THREAD_N; ++i)
                {
                    threads[i].join();
                }
            }

            std::cout << "\nThreads joined...\n" << std::endl;

            // Dump shutdown state...
            std::cout << "Dump shutdown state...\n" << std::endl;

            {
                ct_work* head = (ct_work*)shared.m_stack_in.m_head.load(std::memory_order_relaxed);
                shared.m_stack_in.m_head = nullptr;

                while (head)
                {
                    ct_work* next = (ct_work*)head->m_next;
                    delete head;
                    ++g_ct_work_dealloations;
                    head = next;
                }
            }

            std::cout << "\nShutdown complete!\n" << std::endl;
        }

        // Sanity check...
        {
            std::cout << "g_ct_work_alloations = " << g_ct_work_alloations
    << "\n";
            std::cout << "g_ct_work_dealloations = " << g_ct_work_dealloations << std::endl;

            if (g_ct_work_alloations != g_ct_work_dealloations)
            {
                std::cout << "Pardon my French, but shit!!!!!!!!!!!!!! NOT
    GOOD\n" << std::endl;
                std::cout << "DAMN IT!!!! Grrrrr\n" << std::endl;
            }
        }

        std::cout << "\nFin!\n" << std::endl;

        return 0;
    }
    ______________________________________




    My output:
    _______________________
    Futex Stack Test
    by: Chris M. Thomasson
    version: (0.0.1)
    _________________________________

    CT_THREAD_N = 42
    CT_WORK_N = 10000000
    CT_WAIT = 00000000DEADBEEF
    ct_shared::ct_shared()


    Launching threads...


    Generate work...


    Completed all work!

    Sending shutdown state...


    Threads joined...

    Dump shutdown state...


    Shutdown complete!

    ct_shared::~ct_shared()

    g_ct_work_alloations = 10000042
    g_ct_work_dealloations = 10000042

    Fin!
    _______________________


    Well, any luck?

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From Wuns Haerst@21:1/5 to All on Sat May 3 04:21:42 2025
    Am 03.05.2025 um 03:49 schrieb Chris M. Thomasson:
    On 5/2/2025 4:01 AM, Wuns Haerst wrote:
    It's not hard to implement a lock-free stack with DW-CAS,
    so no need for futexes. That's my dcas_atomic:

    #pragma once
    #include <cstdint>
    #include <utility>
    #include <atomic>
    #include <type_traits>
    #include <climits>
    #include <bit>
    #include <memory>

    #if defined(_MSC_VER)
         #pragma warning(push)
         #pragma warning(disable: 26495)
    #endif
    #if defined(__llvm__)
         #pragma clang diagnostic push
         #pragma clang diagnostic ignored "-Watomic-alignment"
    #endif

    constexpr int
         DCAS_ACQ_REL = (int)std::memory_order::acq_rel,
         DCAS_ACQUIRE = (int)std::memory_order::acquire,
         DCAS_RELAXED = (int)std::memory_order::relaxed,
         DCAS_RELEASE = (int)std::memory_order::release,
         DCAS_SEQ_CST = (int)std::memory_order::seq_cst;
    template<int Type>
    using dcas_type = std::integral_constant<int, Type>;
    using dcas_acq_rel = dcas_type<DCAS_ACQ_REL>;
    using dcas_acquire = dcas_type<DCAS_ACQUIRE>;
    using dcas_relaxed = dcas_type<DCAS_RELAXED>;
    using dcas_release = dcas_type<DCAS_RELEASE>;
    using dcas_seq_cst = dcas_type<DCAS_SEQ_CST>;

    struct dcas_atomic
    {
         static_assert(sizeof(size_t) == 8 || sizeof(size_t) == 4, "must
    be 64 or 32 bit architecture");
         struct pair_t { size_t first, second; };
         dcas_atomic() = default;
         dcas_atomic( pair_t const &desired );
         std::atomic<size_t> &first() noexcept;
         std::atomic<size_t> &second() noexcept;
         operator pair_t() noexcept;
         template<int Type>
         bool compare_exchange( pair_t &expected, pair_t const &desired,
    dcas_type<Type> = dcas_seq_cst() ) noexcept;
         template<int Type>
         pair_t load( dcas_type<Type> = dcas_seq_cst() ) const noexcept;
         template<int Type>
         void store( pair_t const &niu, dcas_type<Type> = dcas_seq_cst() ) >> noexcept;
    private:
    #if defined(__GNUC__) || defined(__clang__) && !defined(_MSC_VER)
         #if SIZE_WIDTH == 64
         using dword_t = unsigned __int128;
         #elif SIZE_WIDTH == 32
         using dword_t = unsigned long long;
         #else
             #error unknown architecture
         #endif
    #endif
         union alignas(2 * sizeof(size_t)) U
         {
             U() {}
             static_assert(sizeof(std::atomic<size_t>) == sizeof(size_t),
    "sizeof(atomic<size_t>) must be == sizeof(size_t)");
             std::atomic<size_t> m_atomics[2];
    #if defined(_MSC_VER)
         #if defined(_M_X64) || defined(_M_ARM64) || defined(_M_ARM64EC)
             __int64 volatile m_firstAndSecond[2];
         #elif defined(_M_IX86)
             __int64 volatile m_firstAndSecond;
         #else
             #error unknown architecture
         #endif
    #elif defined(__GNUC__) || defined(__clang__)
             dword_t volatile m_firstAndSecond;
    #endif
         } u;
    };

    inline dcas_atomic::dcas_atomic( pair_t const &desired )
    {
         u.m_atomics[0].store( desired.first, std::memory_order_relaxed ); >>      u.m_atomics[1].store( desired.second, std::memory_order_relaxed ); >> }

    inline std::atomic<size_t> &dcas_atomic::first() noexcept
    {
         return u.m_atomics[0];
    }

    inline std::atomic<size_t> &dcas_atomic::second() noexcept
    {
         return u.m_atomics[1];
    }

    inline dcas_atomic::operator pair_t() noexcept
    {
         return pair_t( first().load( std::memory_order_relaxed ),
    second().load( std::memory_order_relaxed ) );
    }

    template<int Type>
    inline bool dcas_atomic::compare_exchange( pair_t &expected, pair_t
    const &desired, dcas_type<Type> ) noexcept
    {
         using namespace std;
         static_assert(Type == DCAS_ACQ_REL || Type == DCAS_ACQUIRE ||
    Type == DCAS_RELAXED || Type == DCAS_RELEASE || Type == DCAS_SEQ_CST);
    #if defined(_MSC_VER)
         #if defined(_M_X64) || defined(_M_ARM64) || defined(_M_ARM64EC)
         __int64 expectedA[2];
         expectedA[0] = (__int64)expected.first;
         expectedA[1] = (__int64)expected.second;
         char fRet;
         #if defined(_M_X64)
         fRet = _InterlockedCompareExchange128( u.m_firstAndSecond,
    desired.second, desired.first, expectedA );
         #else
         if constexpr( Type == DCAS_ACQ_REL || Type == DCAS_SEQ_CST )
             fRet = _InterlockedCompareExchange128( u.m_firstAndSecond, >> desired.second, desired.first, expectedA );
         else if constexpr( Type == DCAS_ACQUIRE )
             fRet =
    _InterlockedCompareExchange128_acq( u.m_firstAndSecond,
    desired.second, desired.first, expectedA );
         else if constexpr( Type == DCAS_RELAXED )
             fRet = _InterlockedCompareExchange128_nf( u.m_firstAndSecond,
    desired.second, desired.first, expectedA );
         else
             fRet =
    _InterlockedCompareExchange128_rel( u.m_firstAndSecond,
    desired.second, desired.first, expectedA );
         #endif
         if( fRet )
             return true;
         expected.first = expectedA[0];
         expected.second = expectedA[1];
         return false;
         #elif defined(_M_IX86)
         auto compose = []( pair_t const &p ) -> uint64_t { return p.first >> | (uint64_t)p.second << 32; };
         uint64_t
             cDesired = compose( desired ),
             cExpected = compose( expected ),
             cResult = _InterlockedCompareExchange64( &u.m_firstAndSecond,
    cDesired, cExpected );
         if( cResult == cExpected ) [[likely]]
             return true;
         expected.first = (uint32_t)cResult;
         expected.second = (uint32_t)(cResult >> 32);
         return false;
         #else
             #error unspupported Windows-platform
         #endif
    #elif defined(__GNUC__) || defined(__clang__)
         constexpr auto
             pair_t::*FIRST = std::endian::native == std::endian::little ?
    &pair_t::first : &pair_t::second,
             pair_t::*SECOND = std::endian::native ==
    std::endian::little ? &pair_t::second : &pair_t::first;
         auto compose = []( pair_t const &p ) -> dword_t    { return
    (dword_t)(p.*FIRST) | (dword_t)(p.*SECOND) << SIZE_WIDTH; };
         dword_t
             dwExpected = compose( expected ),
             dwDesired = compose( desired );
         bool ret;
         if constexpr( Type == DCAS_ACQ_REL )
             ret = __atomic_compare_exchange_n( &u.m_firstAndSecond,
    &dwExpected, dwDesired, true, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED );
         else if constexpr( Type == DCAS_ACQUIRE )
             ret = __atomic_compare_exchange_n( &u.m_firstAndSecond,
    &dwExpected, dwDesired, true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED );
         else if constexpr( Type == DCAS_RELAXED )
             ret = __atomic_compare_exchange_n( &u.m_firstAndSecond,
    &dwExpected, dwDesired, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED );
         else if constexpr( Type == DCAS_RELEASE )
             ret = __atomic_compare_exchange_n( &u.m_firstAndSecond,
    &dwExpected, dwDesired, true, __ATOMIC_RELEASE, __ATOMIC_RELAXED );
         else
             ret = __atomic_compare_exchange_n( &u.m_firstAndSecond,
    &dwExpected, dwDesired, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED );
         if( ret ) [[likely]]
             return true;
         expected.*FIRST = (size_t)dwExpected;
         expected.*SECOND = (size_t)(dwExpected >> SIZE_WIDTH);
         return false;
    #else
         #error unspupported platform
    #endif
    }

    template<int Type>
    inline typename dcas_atomic::pair_t
    dcas_atomic::load( dcas_type<Type> ) const noexcept
    {
         using namespace std;
         static_assert(Type == DCAS_ACQ_REL || Type == DCAS_ACQUIRE ||
    Type == DCAS_RELAXED || Type == DCAS_RELEASE || Type == DCAS_SEQ_CST);
         pair_t cmp( 0, 0 );
         const_cast<dcas_atomic *>(this)->compare_exchange( cmp, cmp,
    dcas_type<Type>() );
         return cmp;
    }

    template<int Type>
    inline void dcas_atomic::store( pair_t const &niu, dcas_type<Type> )
    noexcept
    {
         using namespace std;
         static_assert(Type == DCAS_ACQ_REL || Type == DCAS_ACQUIRE ||
    Type == DCAS_RELAXED || Type == DCAS_RELEASE || Type == DCAS_SEQ_CST);
         pair_t cmp( *this );
         while( !this->compare_exchange( cmp, niu, dcas_type<Type>() ) ); >> }

    #if defined(_MSC_VER)
         #pragma warning(pop)
    #endif
    #if defined(__llvm__)
         #pragma clang diagnostic pop
    #endif


    Hard to read that mess. Where is your push and pop, and where are you
    bumping the ABA counter?




    That's only the DWCAS-atomic.

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)
  • From jseigh@21:1/5 to Chris M. Thomasson on Thu May 8 18:46:48 2025
    On 5/3/25 21:57, Chris M. Thomasson wrote:
    On 5/3/2025 12:58 PM, Chris M. Thomasson wrote:
    On 5/3/2025 11:19 AM, Bonita Montero wrote:
    Am 03.05.2025 um 20:02 schrieb Chris M. Thomasson:

    Now, its fun to be able to get a lock-free stack with only single
    word exchange and cas, and have the ability to use it with a futex
    to allow one to wait on empty conditions.

    Is a lock-free stack with a single word CAS really possible without an
    ABA-problem ?

    Will get back to you. Need to run a bunch of errands. If you omit
    single item pop, yup. It's also possible to get rid of the cas in push.

    The ABA problem _and_ memory issue is only there in on the single node
    pop for a lock-free stack. If you just whack it with a nullptr aka flush _all_ nodes, then you are all right and ready to roll.

    Actually, IBM had/has a nice write up of it in an appendix for the
    principles of operations iirc. I think it might of been in appendix 40
    ish or something. God it's been a while since I read that Bonita. Then
    they have a rather nifty PLO, or perform locked operation, again iirc...

    The futex is fun to use because it allows one to wait on an empty condition...

    Actually, I remember Joe Seigh taking all the nodes in an exchange,
    reversing them, and BAM, its FIFO. ;^)


    Probably for a MPSC queue. I don't the FIFO semantics would hold up
    very well for MPMC queue.

    Joe Seigh

    --- SoupGate-Win32 v1.05
    * Origin: fsxNet Usenet Gateway (21:1/5)