基本信息
源码名称:ZeroMQ示例
源码大小:5.53M
文件格式:.7z
开发语言:C/C++
更新时间:2022-02-02
   友情提示:(无需注册或充值,赞助后即可获取资源下载链接)

     嘿,亲!知识可是无价之宝呢,但咱这精心整理的资料也耗费了不少心血呀。小小地破费一下,绝对物超所值哦!如有下载和支付问题,请联系我们QQ(微信同号):813200300

本次赞助数额为: 2 元 
   源码介绍

ZeroMQ示例


using Clock = std::chrono::steady_clock;
using TimePoint = Clock::time_point;
using namespace std::chrono_literals;
using namespace ZMQ;
using namespace LOGGER;

void* contextInproc = nullptr;
void InprocRevThread()
{
    while (contextInproc==nullptr)Sleep(100);
    void* socket = zmq_socket(contextInproc, ZMQ_SUB);
    zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
    int err = zmq_connect(socket, "inproc://cbbps");

    TimePoint prevTimePoint = Clock::now();
    int loop = 0;
    int prevLoop = 0;
    while (1) {
        char buffer[64] = { 0 };
        int size = zmq_recv(socket, buffer, 64, ZMQ_DONTWAIT);
        if (size > 0)loop ;

        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "INPROC RevCount:" << std::to_string(loop - prevLoop) << "\n";
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}


void InprocSendThread()
{
    contextInproc = nullptr;
    contextInproc = zmq_ctx_new();
    void* socket = zmq_socket(contextInproc, ZMQ_PUB);
    int err= zmq_bind(socket, "inproc://cbbps");

    while (1) {
        char  buffer[64] = "Inproc ZeroMQ Test";
        int err = zmq_send(socket, buffer, 64, ZMQ_DONTWAIT);
        std::this_thread::sleep_for(2us);
    }

}


void TCPRev1Thread(int port)
{
    ZeroMQ TCP_SUB_recv;
    int which_port = 5560 port;
    std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(5560);
    TCP_SUB_recv.InitTCPReceiveMode_SUB(MAX_SIZE, ip_port_str.c_str());
    
    
    TimePoint prevTimePoint = Clock::now();
    uint32_t loop = 0;
    uint32_t prevLoop = 0;
    uint32_t reved = 0;
    uint32_t prev_reved = 0;
    while (1) {
        char buffer[MAX_SIZE] = { 0 };

        int size = TCP_SUB_recv.GetTCPReceiveData_SUB_More(buffer);

        if (size >= 0) {
            loop ;
            uint32_t* p = (uint32_t*)&buffer[MAX_SIZE - 4];
            if (*p != 0)reved ;
        }


        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "TCP Rev"<< which_port<<"Count:" << std::to_string(loop - prevLoop) << "\n";
           
            std::cout << "TCP RevData" << which_port << "Count:"<<std::to_string(reved- prev_reved)  << "\n";
            prev_reved = reved;
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}


void TCPSend1Thread(int port)
{
    ZeroMQ TCP_PUB_send;
    port = 5560 0;
    std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(port);
    TCP_PUB_send.InitTCPSendMode_PUB(MAX_SIZE, ip_port_str.c_str());

    uint32_t loop = 0;
    char buffer[MAX_SIZE] = { 1 };
    while (1) {
        buffer[0] = loop;
        int* p = (int*)&buffer[MAX_SIZE - 4];
         *p= loop ;
        TCP_PUB_send.SetTCPSendData_PUB_More((void*)buffer, MAX_SIZE,1024);

        std::this_thread::sleep_for(2us);
    }
}

void TCPSendThread_PUSH(int port)
{
    ZeroMQ TCP_send;
    int which_port = 5560 0;
    std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(which_port);
    TCP_send.InitTCPSendMode_PUSH(MAX_SIZE, ip_port_str.c_str());

    uint32_t loop = 0;
    char buffer[MAX_SIZE] = { 1 };
    while (1) {
        buffer[0] = (char)port 1;
        int* p = (int*)&buffer[MAX_SIZE - 4];
        *p = loop ;
        TCP_send.SetTCPSendData_PUSH((void*)buffer, MAX_SIZE);

        std::this_thread::sleep_for(2us);
    }
}

void TCPRevThread_PULL(int port)
{
    ZeroMQ TCP_recv;
    int which_port = 5560 port;
    std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(5560);
    TCP_recv.InitTCPReceiveMode_PULL(MAX_SIZE, ip_port_str.c_str());


    TimePoint prevTimePoint = Clock::now();
    uint32_t loop = 0;
    uint32_t prevLoop = 0;
    uint32_t reved = 0;
    uint32_t prev_reved = 0;
    while (1) {
        char buffer[MAX_SIZE] = { 0 };

        int size = TCP_recv.GetTCPReceiveData_PULL(buffer);

        if (size >= 0) {
            loop ;
            uint32_t* p = (uint32_t*)&buffer[MAX_SIZE - 4];
            if (*p != 0)reved ;
            uint8_t port = buffer[0];
            std::cout << "TCP Rev Port" << std::to_string(port) << "\n";
        }


        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "TCP Rev" << which_port << "Count:" << std::to_string(loop - prevLoop) << "\n";

            std::cout << "TCP RevData" << which_port << "Count:" << std::to_string(reved - prev_reved) << "\n";
            prev_reved = reved;
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}


void DEALER_Send_Thread()
{
    ZeroMQ _0MQ;
    std::string ip_port_str = "tcp://192.168.203.127:" std::to_string(52000);
    _0MQ.InitTCPSendMode_DEALER(MAX_SIZE, ip_port_str.c_str());

    uint32_t loop = 1;
    char buffer[MAX_SIZE] = { 0 };
    while (1) {
        uint32_t* p = (uint32_t*)&buffer[0];
        *p = loop;
        if (_0MQ.SetTCPSendData_DEALER((void*)buffer, MAX_SIZE)) {
            if ((*p) % 10 == 0) {
                LOGGER_INFO("send thread sended msg count{:d}", *p);
            }
            loop ;
        }
        else {
            LOGGER_WARN("send thread sended error");
        }
        
        std::this_thread::sleep_for(10ms);
    }
}

void DEALER_RECV_Thread(size_t id)
{
    ZeroMQ _0MQ;
    std::string ip_port_str = "tcp://192.168.203.127:" std::to_string(52000);
    _0MQ.InitTCPReceiveMode_DEALER(MAX_SIZE, ip_port_str.c_str());

    uint32_t prev_head = 0;
    char buffer[MAX_SIZE] = { 0 };
    while (1) {
        if (_0MQ.GetTCPReceiveData_DEALER((void*)buffer)== MAX_SIZE) {
            uint32_t* head = (uint32_t*)&buffer[0];
            if ((*head) % 10 == 0) {
                LOGGER_INFO("recv thread{:d} recved msg count{:d}",id,*head);
            }

            if ((*head - prev_head) > 1) {
                LOGGER_ERROR("recv thread{:d} miss msg{:d}", id, *head);
            }

            prev_head = *head;
        }

        std::this_thread::sleep_for(1ms);
    }
}

void UDPRev1Thread()
{
    ZeroMQ UDP_SUB_recv;

    UDP_SUB_recv.InitUDPReceiveMode_DISH(MAX_SIZE, "udp://226.8.5.5:5555", "TV");

    TimePoint prevTimePoint = Clock::now();
    int loop = 0;
    int prevLoop = 0;
    while (1) {
        char buffer[MAX_SIZE] = { 0 };

        int size = UDP_SUB_recv.GetUDPReceiveData_DISH(buffer);

        if (size >= 0) {
            loop ;
        }


        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "UDP Rev2Count:" << std::to_string(loop - prevLoop) << "\n";
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}


void UDPRev2Thread()
{
    ZeroMQ UDP_SUB_recv;

    UDP_SUB_recv.InitUDPReceiveMode_DISH(MAX_SIZE, "udp://226.8.5.5:5555", "TV");

    TimePoint prevTimePoint = Clock::now();
    int loop = 0;
    int prevLoop = 0;
    while (1) {
        char buffer[MAX_SIZE] = { 0 };

        int size=UDP_SUB_recv.GetUDPReceiveData_DISH(buffer);

        if (size >= 0) {
            loop ;
        }


        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "UDP Rev2Count:" << std::to_string(loop - prevLoop) << "\n";
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}

void UDPSendThread()
{
    ZeroMQ UDP_PUB_send;
    
    UDP_PUB_send.InitUDPSendMode_RADIO(MAX_SIZE, "udp://226.8.5.5:5555", "TV");
    
    int loop = 0;
    char buffer[MAX_SIZE] = { 1 };
    while (1) {
        buffer[0] = loop;
        buffer[MAX_SIZE-1] = loop ;
        UDP_PUB_send.SetUDPSendData_RADIO(buffer, MAX_SIZE);

        std::this_thread::sleep_for(2us);
    }
}


void TCPSendREPThread()
{
    ZeroMQ TCP_REP_send;

    TCP_REP_send.InitTCPSendMode_REP(MAX_SIZE, "tcp://127.0.0.1:5566");

    int loop = 0;
    int prevLoop = 0;
    uint8_t buffer[MAX_SIZE] = { 1,2,3 };
    TimePoint prevTimePoint = Clock::now();
    while (1) {
        buffer[MAX_SIZE-1] = loop;

        

        uint8_t rev_buffer[MAX_SIZE] = { 1,2,3 };
        if (TCP_REP_send.GetTCPReceiveData_REP(rev_buffer)>0) {
            //std::cout << "REP RevCount:"  << "\n";
            loop ;
            TCP_REP_send.SetTCPSendData_REP((void*)buffer, MAX_SIZE);
            
        }

        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "TCP REP RevCount:" << std::to_string(loop - prevLoop) << "\n";
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }
        std::this_thread::sleep_for(2us);
    }

}

void TCPRevREQThread()
{
    ZeroMQ TCP_REQ_rev;

    TCP_REQ_rev.InitTCPReceiveMode_REQ(MAX_SIZE, "tcp://127.0.0.1:5566");//不能写localhost

    uint8_t buffer[MAX_SIZE] = { 9,2,3 };
    int loop = 1;
    
    while (1) {

        if (TCP_REQ_rev.GetTCPReceiveData_REQ(buffer)>0) {
            //std::cout << "REQ RevCount:" << "\n";
            buffer[0] = (loop 1);
            buffer[1] = (loop 1);
            buffer[2] = (loop 1);
            buffer[MAX_SIZE-1] = (loop 1);
            loop ;
            TCP_REQ_rev.SetTCPSendData_REQ((void*)buffer, MAX_SIZE);
        }
        std::this_thread::sleep_for(2us);
    }

}

std::array< std::thread, 10> rev_thr_array_list;
std::thread send_thr;
std::thread revc_thr;
int main()
{
    std::cout << "Hello World!\n";
    if (!g_Logger) {
        g_Logger = std::make_unique<LogSPD>("log/ARJ21_HYS_panel.log", spdlog::level::trace);
    }
   
    char c=getchar();
   // std::thread th1=std::thread(InprocSendThread);
   // th1.detach();
   // std::thread th2= std::thread(InprocRevThread);
   // th2.detach();

   //std::thread th2= std::thread(TCPRevREQThread);
   //th2.detach();
   //std::thread th3 = std::thread(TCPSendREPThread);
   //th3.detach();

    //TCP PUB
    /*
    if (c == '1')
    {
        for (int i = 0; i < 1; i ) {
            thr_array[i] = std::thread(TCPSend1Thread,i);
            thr_array[i].detach();
        }
    }
    else if (c == '2')
    {
        for (int i = 0; i < 1; i ) {
            thr_array[i] = std::thread(TCPRev1Thread,i);
            thr_array[i].detach();
        }
    }
    */

    //TCP PUB
    /*
    for (int i = 0; i < 1; i ) {
        thr_array[i] = std::thread(TCPSend1Thread, i);
        thr_array[i].detach();
        Sleep(3000);
    }
    Sleep(1000);
    for (int i = 0; i < 3; i ) {
        thr_array[i] = std::thread(TCPRev1Thread, i);
        thr_array[i].detach();
        Sleep(3000);
    }
    */
   
    //TCP REP REQ
   //std::thread th6 = std::thread(TCPRevREQThread);
   //th6.detach();
   // Sleep(1000);
   // std::thread th7 = std::thread(TCPSendREPThread);
   //th7.detach();

    //UDP PUB SUB
    //std::thread th3 = std::thread(UDPSendThread);
    //th3.detach();
    //Sleep(1000);
    //std::thread th4 = std::thread(UDPRev1Thread);
    //th4.detach();
    //std::thread th5 = std::thread(UDPRev2Thread);
    //th5.detach();

    //TCP PUSH PULL
    /*
    for (int i = 0; i < 2; i ) {
        thr_array[i] = std::thread(TCPSendThread_PUSH, i);
        thr_array[i].detach();
        Sleep(3000);
    }
    Sleep(1000);
    for (int i = 0; i < 1; i ) {
        thr_array[i] = std::thread(TCPRevThread_PULL, i);
        thr_array[i].detach();
        Sleep(3000);
    }
    */


    if (c == '1') {
        LOGGER_INFO("send mode");
        send_thr = std::thread(DEALER_Send_Thread);
    }
    else if (c == '2') {
        LOGGER_INFO("recv mode");
        for (auto i = 0; i < 10; i ) {
            rev_thr_array_list[i] = std::thread(DEALER_RECV_Thread, i);
        }
    }
    else if (c == '3') {
        LOGGER_INFO("send recv mode");
        send_thr = std::thread(DEALER_Send_Thread);
        for (auto i = 0; i < 1; i ) {
            rev_thr_array_list[i] = std::thread(DEALER_RECV_Thread, i);
        }
    }

    while (1) {
        Sleep(150);
    }

}

.
├── ZeroMQ_Test
│   ├── Debug
│   │   ├── ZeroMQ_Test.log
│   │   ├── ZeroMQ_Test.obj
│   │   ├── ZeroMQ_Test.tlog
│   │   │   ├── CL.command.1.tlog
│   │   │   ├── CL.read.1.tlog
│   │   │   ├── CL.write.1.tlog
│   │   │   ├── ZeroMQ_Test.lastbuildstate
│   │   │   ├── link-cvtres.read.1.tlog
│   │   │   ├── link-cvtres.write.1.tlog
│   │   │   ├── link-rc.read.1.tlog
│   │   │   ├── link-rc.write.1.tlog
│   │   │   ├── link.command.1.tlog
│   │   │   ├── link.read.1.tlog
│   │   │   ├── link.write.1.tlog
│   │   │   └── unsuccessfulbuild
│   │   ├── vc142.idb
│   │   └── vc142.pdb
│   ├── LogSPD.h
│   ├── Logger.h
│   ├── ZMQ.cpp
│   ├── ZeroMQ.cpp
│   ├── ZeroMQ.h
│   ├── ZeroMQ_Test.cpp
│   ├── ZeroMQ_Test.vcxproj
│   ├── ZeroMQ_Test.vcxproj.filters
│   ├── ZeroMQ_Test.vcxproj.user
│   ├── cbbps
│   ├── log
│   │   └── ARJ21_HYS_panel.log
│   ├── pch.cpp
│   ├── pch.h
│   ├── spdlog
│   │   ├── async.h
│   │   ├── async_logger-inl.h
│   │   ├── async_logger.h
│   │   ├── cfg
│   │   │   ├── argv.h
│   │   │   ├── env.h
│   │   │   ├── helpers-inl.h
│   │   │   └── helpers.h
│   │   ├── common-inl.h
│   │   ├── common.h
│   │   ├── details
│   │   │   ├── backtracer-inl.h
│   │   │   ├── backtracer.h
│   │   │   ├── circular_q.h
│   │   │   ├── console_globals.h
│   │   │   ├── file_helper-inl.h
│   │   │   ├── file_helper.h
│   │   │   ├── fmt_helper.h
│   │   │   ├── log_msg-inl.h
│   │   │   ├── log_msg.h
│   │   │   ├── log_msg_buffer-inl.h
│   │   │   ├── log_msg_buffer.h
│   │   │   ├── mpmc_blocking_q.h
│   │   │   ├── null_mutex.h
│   │   │   ├── os-inl.h
│   │   │   ├── os.h
│   │   │   ├── periodic_worker-inl.h
│   │   │   ├── periodic_worker.h
│   │   │   ├── registry-inl.h
│   │   │   ├── registry.h
│   │   │   ├── synchronous_factory.h
│   │   │   ├── tcp_client-windows.h
│   │   │   ├── tcp_client.h
│   │   │   ├── thread_pool-inl.h
│   │   │   ├── thread_pool.h
│   │   │   ├── udp_client-windows.h
│   │   │   ├── udp_client.h
│   │   │   └── windows_include.h
│   │   ├── fmt
│   │   │   ├── bin_to_hex.h
│   │   │   ├── bundled
│   │   │   │   ├── args.h
│   │   │   │   ├── chrono.h
│   │   │   │   ├── color.h
│   │   │   │   ├── compile.h
│   │   │   │   ├── core.h
│   │   │   │   ├── fmt.license.rst
│   │   │   │   ├── format-inl.h
│   │   │   │   ├── format.h
│   │   │   │   ├── locale.h
│   │   │   │   ├── os.h
│   │   │   │   ├── ostream.h
│   │   │   │   ├── printf.h
│   │   │   │   ├── ranges.h
│   │   │   │   └── xchar.h
│   │   │   ├── chrono.h
│   │   │   ├── compile.h
│   │   │   ├── fmt.h
│   │   │   ├── ostr.h
│   │   │   └── xchar.h
│   │   ├── formatter.h
│   │   ├── fwd.h
│   │   ├── logger-inl.h
│   │   ├── logger.h
│   │   ├── pattern_formatter-inl.h
│   │   ├── pattern_formatter.h
│   │   ├── sinks
│   │   │   ├── android_sink.h
│   │   │   ├── ansicolor_sink-inl.h
│   │   │   ├── ansicolor_sink.h
│   │   │   ├── base_sink-inl.h
│   │   │   ├── base_sink.h
│   │   │   ├── basic_file_sink-inl.h
│   │   │   ├── basic_file_sink.h
│   │   │   ├── daily_file_sink.h
│   │   │   ├── dist_sink.h
│   │   │   ├── dup_filter_sink.h
│   │   │   ├── hourly_file_sink.h
│   │   │   ├── mongo_sink.h
│   │   │   ├── msvc_sink.h
│   │   │   ├── null_sink.h
│   │   │   ├── ostream_sink.h
│   │   │   ├── qt_sinks.h
│   │   │   ├── ringbuffer_sink.h
│   │   │   ├── rotating_file_sink-inl.h
│   │   │   ├── rotating_file_sink.h
│   │   │   ├── sink-inl.h
│   │   │   ├── sink.h
│   │   │   ├── stdout_color_sinks-inl.h
│   │   │   ├── stdout_color_sinks.h
│   │   │   ├── stdout_sinks-inl.h
│   │   │   ├── stdout_sinks.h
│   │   │   ├── syslog_sink.h
│   │   │   ├── systemd_sink.h
│   │   │   ├── tcp_sink.h
│   │   │   ├── udp_sink.h
│   │   │   ├── win_eventlog_sink.h
│   │   │   ├── wincolor_sink-inl.h
│   │   │   └── wincolor_sink.h
│   │   ├── spdlog-inl.h
│   │   ├── spdlog.h
│   │   ├── stopwatch.h
│   │   ├── tweakme.h
│   │   └── version.h
│   ├── x64
│   │   └── Debug
│   │       ├── ZeroMQ.obj
│   │       ├── ZeroMQ_Test.Build.CppClean.log
│   │       ├── ZeroMQ_Test.exe.recipe
│   │       ├── ZeroMQ_Test.ilk
│   │       ├── ZeroMQ_Test.log
│   │       ├── ZeroMQ_Test.obj
│   │       ├── ZeroMQ_Test.tlog
│   │       │   ├── CL.command.1.tlog
│   │       │   ├── CL.read.1.tlog
│   │       │   ├── CL.write.1.tlog
│   │       │   ├── ZeroMQ_Test.lastbuildstate
│   │       │   ├── link.command.1.tlog
│   │       │   ├── link.read.1.tlog
│   │       │   └── link.write.1.tlog
│   │       ├── ZeroMQ_Test.vcxproj.FileListAbsolute.txt
│   │       ├── vc142.idb
│   │       └── vc142.pdb
│   ├── zmq.h
│   └── zmq_utils.h
├── ZeroMQ_d
│   ├── libzmq-v142-mt-gd-4_3_5.exp
│   ├── libzmq-v142-mt-gd-4_3_5.lib
│   ├── libzmq-v142-mt-sgd-4_3_5.lib
│   ├── testutil-static.lib
│   ├── testutil.lib
│   ├── unity.lib
│   └── unity.pdb
└── 好例子网_ZeroMQ_Test.7z

14 directories, 154 files