Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[matsumiya] メモ #10

Open
rmatsumiya opened this issue Sep 1, 2019 · 47 comments
Open

[matsumiya] メモ #10

rmatsumiya opened this issue Sep 1, 2019 · 47 comments
Assignees

Comments

@rmatsumiya
Copy link

No description provided.

@rmatsumiya
Copy link
Author

通信周りについて読んでみる

@rmatsumiya
Copy link
Author

ncclAllReduce()辺りは行数がかなり短い。
引数から構造体を作って、ncclEnqueueCheck()に投げている。

@rmatsumiya
Copy link
Author

rmatsumiya commented Sep 1, 2019

ncclEnqueueCheck()は同期非同期を判断して、saveKernel()にデータを渡す。

  • 同期時ならばここでenqueue(して待つ)が、非同期時はここではenqueueしないようだ。直感に反しているのだが、非同期時に呼び出されるncclAsyncColl()あるいはsaveKernel()にヒントがありそう。

@rmatsumiya
Copy link
Author

ncclAsyncColl()はThread Local Storageに保存されているncclAsyncArgs構造体にinfo->commを保存するための関数のようだ。即ちこの時点では通信はKickされない?

@rmatsumiya
Copy link
Author

Channelという概念を使っている。リングバッファを使っていると思っていたが違うようだ。

struct ncclChannel* channel = info->comm->channels+(info->comm->myParams->gridDim.x % info->comm->nChannels);
  struct ncclProxyArgs proxyArgs;
  memset(&proxyArgs, 0, sizeof(struct ncclProxyArgs));
  NCCLCHECK(computeColl(info, &coll, &proxyArgs));

この辺りの構造体や関数がかなり重要そうだが、よくわからない。

    // Proxy
    proxyArgs.channel = channel;
    NCCLCHECK(transportSaveProxies(&proxyArgs, info->pattern, info->root, info->comm->nRanks));

    info->comm->myParams->gridDim.x++;

    int opIndex = channel->collFifoTail;
    struct ncclColl* c = channel->collectives+opIndex;
    volatile uint8_t* activePtr = (volatile uint8_t*)&c->active;
    while (activePtr[0] != 0) sched_yield();

    memcpy(c, &coll, sizeof(struct ncclColl));

proxyはchannelを持っていて、ChannelがCollを持っているようだ。
computeCollとtransportSaveProxies辺りを読んでみる。

@rmatsumiya
Copy link
Author

computeColl()を読んでたら、Chunk辺りの概念が出始めてきた。
誰かが前回に読んでた気がするので、ちょっとログを漁ってみる。

@rmatsumiya
Copy link
Author

#4 (comment)

Chunk辺りについてはy1r先生が読んでた

@rmatsumiya
Copy link
Author

  int stepSize   = ( llMode ? NCCL_LL_BUFF_SIZE : info->comm->channels[0].buffSize ) / NCCL_STEPS;
  int chunkSteps = (llMode|treeMode) ? 1 : info->chunkSteps;
  int sliceSteps = (llMode|treeMode) ? 1 : info->sliceSteps;
  int chunkSize  = stepSize*chunkSteps;

llModeとtreeModeというのに依存してchunkの大きさが決まるらしい。

  // Compute llMode, nChannels, nThreads
  int llMode;
  getKernelInfo(info, &coll->args.nChannels, &coll->args.nThreads, &llMode);

  int treeMode = info->pattern >= ncclPatternTreeUp ? 1 : 0;
  coll->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, llMode, treeMode);

色々ツッコミどころのありそうなコードだ……

@rmatsumiya
Copy link
Author

rmatsumiya commented Sep 1, 2019

static void getKernelInfo(struct ncclInfo* info, uint8_t* nChannels, uint16_t* nThreads, int* llMode) {
  // Compute thresholds and limits that users can override
  ssize_t perThreadLLThreshold = std::min<ssize_t>(info->comm->threadThreshold, NCCL_LL_CHANNEL_THRESHOLD);
  int maxLLNthreads = std::min(NCCL_LL_MAX_NTHREADS, info->comm->nThreads);

  // First compute nThreads
  int nt = NCCL_LL_MIN_NTHREADS;
  while (DIVUP(info->nBytes, nt*info->nchunksPerLoop) > perThreadLLThreshold && nt*2 <= maxLLNthreads) nt *= 2;

  // Then compute nChannels
  int nc = DIVUP(info->nBytes, nt*info->nchunksPerLoop*perThreadLLThreshold);
  if (nc == 0) nc = 1;
  if (nc > info->comm->nChannels) nc = info->comm->nChannels;

  // Check if we have a fixed LL threshold, otherwise compute it.
  int perThreadThreshold = info->comm->threadThreshold;
  if (info->pattern >= ncclPatternTreeUp) perThreadThreshold *= 4;
  ssize_t llThreshold = info->comm->llThreshold >= 0 ?
    info->comm->llThreshold :
    nc*nt*info->nchunksPerLoop*perThreadThreshold;

  if (info->nBytes <= llThreshold) {
    *llMode = 1;
    *nChannels = nc;
    *nThreads = nt;
  } else {
    *llMode = 0;
    *nChannels = info->comm->nChannels;
    *nThreads = info->comm->nThreads+1;
  }
}
// Channels / LL tuning
#define NCCL_LL_CHANNEL_THRESHOLD 8 // Per thread size before we start increasing nrings
#define NCCL_THREAD_THRESHOLD 64  // Per thread size before we switch to non-LL
#define NCCL_THREAD_THRESHOLD_PREVOLTA 32 // Per thread size before we switch to non-LL for pre-Volta archs
#define NCCL_LL_MIN_NTHREADS 64

LL is 何……
バイト数が何らかの値を下回ったらllModeというのに該当するっぽいので、「小さいデータのやりとり」と仮定して読みすすめる。

@rmatsumiya
Copy link
Author

typedef enum {
  ncclPatternRing,
  ncclPatternRingTwice,
  ncclPatternPipelineFrom,
  ncclPatternPipelineTo,
  ncclPatternTreeUp,
  ncclPatternTreeDown,
  ncclPatternTreeUpDown
} ncclPattern_t;

enumに順序関係を持たせるのはアンチパターンだと思っているのだが、それは一旦おいておく。
要するに、木を上下するようなタイプの通信がtreeModeに該当するということのようだ。

@rmatsumiya
Copy link
Author

  // Compute lastChunkSize
  if (treeMode == 1 && llMode == 0) {
    if (info->pattern == ncclPatternTreeUpDown) {
      // Optimize chunkSize / nSteps
      while (info->nBytes / (coll->args.nChannels*chunkSize) < info->comm->channels[0].tree.depth*8 && chunkSize > 131072) chunkSize /= 2;
      while (info->nBytes / (coll->args.nChannels*chunkSize) < info->comm->channels[0].tree.depth*4 && chunkSize > 65536) chunkSize /= 2;
      while (info->nBytes / (coll->args.nChannels*chunkSize) < info->comm->channels[0].tree.depth && chunkSize > 32768) chunkSize /= 2;
    }
    // Use lastChunkSize as chunkSize
    coll->args.lastChunkSize = chunkSize / ncclTypeSize(info->datatype);
  } else if (llMode == 1) {
    int sliceSize = NCCL_LL_SLICE_LINES * sizeof(uint64_t);
    const ssize_t loopSize = coll->args.nChannels*info->nchunksPerLoop*(ssize_t)sliceSize;
    coll->args.lastChunkSize = DIVUP((info->nBytes-(info->nBytes/loopSize)*loopSize), coll->args.nChannels*info->nchunksPerLoop);
    ALIGN_SIZE(coll->args.lastChunkSize, coll->args.nThreads*sizeof(uint64_t));
    coll->args.lastChunkSize /= ncclTypeSize(info->datatype);
  }

  // Compute nSteps for proxies
  size_t nBytes  = llMode ? info->nBytes*2 : info->nBytes;

treeModeのときはチャンクの大きさを小さくしているようだ。lastChunkSizeに入るのは要素数?

  // Compute nSteps for proxies
  size_t nBytes  = llMode ? info->nBytes*2 : info->nBytes;

  int nLoops = (int)(DIVUP(nBytes, (((size_t)(coll->args.nChannels))*info->nchunksPerLoop*chunkSize)));
  proxyArgs->nsteps = info->nstepsPerLoop * nLoops * chunkSteps;
  proxyArgs->sliceSteps = sliceSteps;
  proxyArgs->chunkSteps = chunkSteps;
  proxyArgs->llMode = llMode;
  proxyArgs->opCount = info->comm->opCount;
  TRACE(NCCL_NET,"opCount %lx slicesteps %d spl %d cpl %d nbytes %zi -> llmode %d nchannels %d nthreads %d, nloops %d nsteps %d comm %p",
      coll->args.opCount, proxyArgs->sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, nBytes, llMode, coll->args.nChannels, coll->args.nThreads,
      nLoops, proxyArgs->nsteps, info->comm);
  return ncclSuccess;
}

スライス数を変更せずにproxyArgsに入れているのはsliceとchunkの違いを知る手がかりになりそう。
特にLLでない時はsliceSizeにすら触れていない。

@rmatsumiya
Copy link
Author

ncclResult_t transportSaveProxies(struct ncclProxyArgs* args, int pattern, int root, int nranks) {
  if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice || pattern == ncclPatternPipelineFrom || pattern == ncclPatternPipelineTo) {
    struct ncclRing* ring = &args->channel->ring;
    if (NeedProxy(RECV, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy<proxyRecv>(ring->prev, args));
    if (NeedProxy(SEND, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy<proxySend>(ring->next, args));
  }
  if (pattern == ncclPatternTreeUp || pattern == ncclPatternTreeUpDown) {
    // Tree up
    struct ncclTree* tree = &args->channel->tree;
    for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy<proxyRecv>(tree->down[i], args));
    NCCLCHECK(SaveProxy<proxySend>(tree->up, args));
  }
  if (pattern == ncclPatternTreeDown || pattern == ncclPatternTreeUpDown) {
    // Tree down
    struct ncclTree* tree = &args->channel->tree;
    for (int i=0; i< NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy<proxySend>(tree->down[i], args));
    NCCLCHECK(SaveProxy<proxyRecv>(tree->up, args));
  }
  return ncclSuccess;
}
static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) {
  if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice) return true;

  /* In chains, one rank does not need a proxy. Let's figure out which one it is */
  // Which index in the reorganized rings should we compare root against */
  const int myrank = 0, nextrank = 1, prevrank = nranks-1;
  int index = pattern == ncclPatternPipelineFrom ?
      /*                            no recv /  no send    if root = */
      /* bcast  */ (type == RECV ?   myrank : nextrank ):
      /* reduce */ (type == RECV ? prevrank :   myrank );
  int rank = ring->userRanks[index];
  return (root != rank);
}

ツリー型の通信とリング型の通信ではProxyを生成し、パイプライン式では条件次第のようだ。
パイプライン式というのはbcastやreduceのことのように見える。
ブロードキャスト的な通信において、自分から自分に対する通信ではProxyを生成しない。という話のようだ。

@rmatsumiya
Copy link
Author

ncclResult_t transportAllocateProxyArgs(struct ncclComm* comm, struct ncclProxyArgs** argsptr) {
  struct ncclProxyState* state = &comm->proxyState;
  struct ncclProxyArgs* elem;
  pthread_mutex_lock(&state->mutex);
  if (state->pool == NULL) {
    // Allocate a new pool of elements
    struct ncclProxyPool* newPool;
    NCCLCHECK(ncclCalloc(&newPool, 1));
    struct ncclProxyArgs* newElems = newPool->elems;
    // Chain newly allocated elements
    for (int i=0; i<PROXYARGS_ALLOCATE_SIZE; i++) {
      if (i+1 < PROXYARGS_ALLOCATE_SIZE) newElems[i].next = newElems+i+1;
    }
    // Add them all to the pool list
    state->pool = newElems;
    // Save the pool memory block for later resource release
    newPool->next = state->pools;
    state->pools = newPool;
  }
  elem = state->pool;
  state->pool = state->pool->next;
  pthread_mutex_unlock(&state->mutex);
  elem->next = elem->nextPeer = NULL;
  *argsptr = elem;
  return ncclSuccess;
}

static void ProxyAppend(struct ncclConnector* connector, struct ncclProxyArgs* args) {
  struct ncclComm* comm = connector->comm;
  struct ncclProxyState* state = &comm->proxyState;
  pthread_mutex_lock(&state->mutex);
  if (connector->proxyAppend == NULL) {
    // Nothing running for that peer. Add to the circular list
    if (state->ops == NULL) {
      // Create the list
      args->next = args;
      state->ops = args;
    } else {
      // Insert element in the list
      args->next = state->ops->next;
      state->ops->next = args;
    }
    connector->proxyAppend = args;
  } else {
    // There is an active operation already for that peer.
    // Add it to the per-peer list
    connector->proxyAppend->nextPeer = args;
    connector->proxyAppend = args;
  }
  pthread_mutex_unlock(&state->mutex);
}

template <int type>
static ncclResult_t SaveProxy(int peer, struct ncclProxyArgs* args) {
  if (peer < 0) return ncclSuccess;

  struct ncclPeer* peerComm = args->channel->peers+peer;
  struct ncclConnector* connector = type == proxyRecv ? &peerComm->recv : &peerComm->send;
  if (connector->transportComm->proxy == NULL) return ncclSuccess;

  struct ncclProxyArgs* op;
  NCCLCHECK(transportAllocateProxyArgs(connector->comm, &op));
  memcpy(op, args, sizeof(struct ncclProxyArgs));
  op->connector = connector;
  op->progress = connector->transportComm->proxy;
  op->state = ncclProxyOpReady;
  ProxyAppend(connector, op);
  return ncclSuccess;
}

バッファにProxyArgsを打ち込んで終了する。

@rmatsumiya
Copy link
Author

このバッファはpersistentThreadといういかにもな別スレッドよう関数で読み込まれている。

@rmatsumiya
Copy link
Author

    if (op->state != ncclProxyOpNone) ret = op->progress(op);

この行で通信しているのでしょう

@rmatsumiya
Copy link
Author

struct ncclProxyArgs;
typedef ncclResult_t (*proxyProgressFunc_t)(struct ncclProxyArgs*);

struct ncclProxyArgs {
  proxyProgressFunc_t progress;
struct ncclTransportComm {
  ncclResult_t (*setup)(struct ncclPeerInfo*, struct ncclPeerInfo*, struct ncclConnect*, struct ncclConnector*, int buffSize, int channelId);
  ncclResult_t (*connect)(struct ncclConnect*, struct ncclConnector*);
  ncclResult_t (*free)(void*);
  ncclResult_t (*proxy)(struct ncclProxyArgs*);
};

struct ncclTransport {
  const char name[4];
  ncclResult_t (*canConnect)(ncclTvalue_t*, struct ncclPeerInfo*, struct ncclPeerInfo*);
  ncclResult_t (*getRings)(int, int*, int*, ncclTvalue_t*, int*, int*, int*, int, int*);
  struct ncclTransportComm send;
  struct ncclTransportComm recv;
};

通信の実態はncclTransportCommのproxyであって、それは通信相手による(ネットワーク経由なのか、shmなのか、p2pなのか)。

@rmatsumiya
Copy link
Author

つまるところ、Proxyとは「あらゆる通信方法(経路や送受信)に対応するインターフェース」であると。

@rmatsumiya
Copy link
Author

p2pやshmだとsliceやchunkの概念が登場しない。

  if (args->state == ncclProxyOpReady) {
    // Update opCount
    resources->hostRecvMem->opCount = args->opCount;

    // Round to next multiple of sliceSteps
    resources->step = ROUNDUP(resources->step, args->chunkSteps);
    args->head = resources->step;
    args->tail = resources->step;
    args->end = args->head + args->nsteps;
    args->state = ncclProxyOpProgress;
  }
  if (args->state == ncclProxyOpProgress) {
  }
  return ncclSuccess;

IDEで調べた限りでは、ncclProxyOpProgressが使われているのはここだけ。
そしてncclProxyOpReadyがセットされるのはSaveProxyだけ。

@rmatsumiya
Copy link
Author

        if (args->llMode) {
          int buffSlot = args->tail%NCCL_STEPS;
          int size = sizesFifo[buffSlot];
          if (size != -1) {
            uint32_t flag = NCCL_LL_FLAG(args->tail + 1);
            int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine));
            size = nFifoLines * sizeof(union ncclLLFifoLine);
            union ncclLLFifoLine* lines = resources->hostRecvMem->llBuff+buffSlot*NCCL_LL_SLICE_LINES;
            int ready = 1;
            for (int i=0; i<nFifoLines; i++) {
              volatile uint32_t *f1 = &lines[i].flag1;
              volatile uint32_t *f2 = &lines[i].flag2;
              if (f1[0] != flag || f2[0] != flag) { ready = 0; break; }
            }
            if (ready) {
              NCCLCHECK(ncclNetIsend(resources->netSendComm, lines, size, resources->llMhandle, args->requests+buffSlot));
              if (args->requests[buffSlot] != NULL) {
                sizesFifo[buffSlot] = -1;
                // Make sure size is reset to zero before we update the head.
                __sync_synchronize();
                args->tail += args->sliceSteps;
                args->idle = 0;
              }
            }
          }
        } 

LLModeでは特殊なバッファから送っているようだ

@rmatsumiya
Copy link
Author

      if (args->head < args->tail) {
        int done;
        int buffSlot = args->head%NCCL_STEPS;
        NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL));
        if (done) {
          args->head += args->sliceSteps;
          resources->hostSendMem->head = args->head;
          args->idle = 0;
        }
      }

循環バッファの1単位がstepっぽい。

@rmatsumiya
Copy link
Author

netSendConnect()の後半

  NCCLCHECK(ncclNetRegMr(resources->netSendComm, recvMem->buff, resources->buffSize,
        resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->mhandle));
  NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->devHostRecvMem->llBuff,
        NCCL_LL_BUFF_SIZE, NCCL_PTR_HOST, &resources->llMhandle));

ここでピンダウンしている。devHostRecvMemを使っているということは、もしかしてGDRを使わなかった時用のメモリ領域?

@rmatsumiya
Copy link
Author

rmatsumiya commented Sep 1, 2019

  NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostRecvMem, (void**)&resources->devHostRecvMem, recvSize));
static inline ncclResult_t ncclCudaHostAlloc(void** ptr, void** devPtr, size_t size) {
  CUDACHECK(cudaHostAlloc(ptr, size, cudaHostAllocMapped));
  memset(*ptr, 0, size);
  *devPtr = *ptr;
  return ncclSuccess;
}

template <typename T>
static ncclResult_t ncclCudaCalloc(T** ptr, size_t nelem) {
  CUDACHECK(cudaMalloc(ptr, nelem*sizeof(T)));
  CUDACHECK(cudaMemset(*ptr, 0, nelem*sizeof(T)));
  return ncclSuccess;
}

LLというのは、GPUとの通信的な意味でのLow-Latencyということっぽい。
Panda先生のところが「GDR使うよりもUVM使ったほうが集団通信だと速くなるよ!」みたいな論文を出していたような気がしていて、それを応用したのかも?

@rmatsumiya
Copy link
Author

/* CollectiveArgs + ncclColl are to be a power of two, currently 64 bytes, */
/* to make sure reads to host from the CUDA kernel are aligned. */
/* Make sure to adjust padding at the end of ncclColl. */

Linked-Listなのにstd::listとかを使っていない理由はこういう事情っぽい。
要するにGPUにも載せたい場合があるので、下手にstd系のデータ構造を使えないと。

@rmatsumiya
Copy link
Author

template<int UNROLL, class FUNC, typename T>
__device__ void ncclReduceScatterTreeKernel(struct CollectiveArgs* args) { }

template<int UNUSED, class FUNC, typename T>
__device__ void ncclReduceScatterRingLLKernel(struct CollectiveArgs* args) {
  const int tid = threadIdx.x;
  const int bid = args->bid;

LLを使いそうな何かを一式見つけたけど、今は使われていないっぽい……?
(他のカーネルも同様)

@rmatsumiya
Copy link
Author

Proxyは他rankとの通信で使われる構造で、ChannelはGPUとのやりとりで使われる構造。

@rmatsumiya
Copy link
Author

  struct ncclChannel* channel = comm->channels+blockIdx.x;
  struct ncclRing* ring = &channel->ring;
  const ssize_t size = args->N;
  const int nranks = comm->nRanks;
  const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS);
  const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS);
  const int chunkSize = stepSize * ALLREDUCE_CHUNKSTEPS;
  const ssize_t loopSize = args->nChannels*(ssize_t)chunkSize;

  // Compute pointers
  const T * __restrict__ thisInput = (const T*)args->ThisInput;
  T * __restrict__ thisOutput = (T*)args->ThisOutput;
    // step 0: push data to next GPU
    rankDest = ring->devUserRanks[nranks-1];
    offset = chunkOffset + rankDest * size;

    prims.send(thisInput+offset, nelem);

reduceのコードだが、ブロック毎にアクセスパターンが変わるように見える

@rmatsumiya
Copy link
Author

GPUメモリ側のバッファ領域と、それ以外の領域からのコピーのためにChannelが使われているということは分かった。どういう方法でコピーしているのか(どのスレッドがどの部分をコピーしているのか)がよく分からない……

@rmatsumiya
Copy link
Author

AllReduceのカーネルを読むと、Chunkは1スレッドブロックあたりが担当するコピーサイズっぽい?

      ssize_t offset = gridOffset + bid*chunkSize;
      int nelem = min(chunkSize, size-offset);
      if (tree->up == -1) {
        prims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem);
      } else if (tree->down[0] == -1) {
        prims.send(thisInput+offset, nelem);
      } else {
        prims.recvReduceSend(thisInput+offset, nelem);
      }

コピーを行うメインの関数は

template<int UNROLL, class FUNC, typename T, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS>
__device__ __forceinline__ void ReduceOrCopyMulti(const int tid, const int nthreads,
    int nsrcs, const T* srcs[MAXSRCS], int ndsts, T* dsts[MAXDSTS],
    int N) {

だが、色々工夫しすぎていてぱっと見ただけでは細かくはわからない。
「128bit単位でコピーしてるんだなー」とか「アンロール使いまくってるなー」というのはさすがにわかるが……

inline __device__ void Fetch128(Pack128& v, const Pack128* p) {
  asm volatile("ld.volatile.global.v2.u64 {%0,%1}, [%2];" : "=l"(v.x), "=l"(v.y) : "l"(p) : "memory");
}
inline __device__ void Store128(Pack128* p, Pack128& v) {
  asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" :: "l"(p), "l"(v.x), "l"(v.y) : "memory");
}

@rmatsumiya
Copy link
Author

rmatsumiya commented Sep 1, 2019

Proxyとは: あらゆる通信方法(経路や送受信)に対応するインターフェース
Channelとは: GPUメモリ側のバッファ領域と、それ以外の領域とのコピーのための構造体
stepとは: 通信用循環バッファの1単位
sliceとは: stepをまとめたもの。実際の通信はslice単位で行われる。
chunkとは: (1つのブロックが転送する)GPUメモリ側のバッファ領域の1単位
次回: プリミティブを読む

@rmatsumiya
Copy link
Author

プリミティブはprimitive.hにて一通り定義。
実体はGenericOpという関数にて実装されているようだ。

  template <int DIRECTRECV, int DIRECTSEND, int RECV, int SEND, int SRC, int DST>
  inline __device__ void
  GenericOp(const T* srcPtr, T* dstPtr, int nelem, int directOffset) 

@rmatsumiya
Copy link
Author

テンプレート引数は1と0のみが引き渡されている。Booleanのようだ。

@rmatsumiya
Copy link
Author

Booleanとしてだけでなく、計算にも使っている。

    const T* srcs[RECV*NRECV+SRC];
    srcs[0] = SRC ? srcPtr : directRecvPtr<DIRECTRECV>(0, directOffset);
    if (RECV) {
      if (SRC) srcs[1] = recvPtr(0);
      for (int i=1; i<NRECV && i<nrecv; i++) srcs[SRC+i] = recvPtr(i);
    }

    T* dsts[SEND*NSEND+DST];
    dsts[0] = DST ? dstPtr : directSendPtr<DIRECTSEND>(0, directOffset);
    if (SEND) {
      if (DST) dsts[1] = directSendPtr<DIRECTSEND>(0, directOffset);
      for (int i=1; i<NSEND && i<nsend; i++) dsts[DST+i] = directSendPtr<DIRECTSEND>(i, directOffset);
    }

recvPtr等はバッファのポインタを返す関数

  inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepSize; }
  inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepSize; }
  inline __device__ const T* recvPtr(int i) { return ((const T*)recvBuff[i])+recvOffset(i); }
  inline __device__ T* sendPtr(int i) { return ((T*)sendBuff[i])+sendOffset(i); }

NSEND/NRECVはクラステンプレート引数

@rmatsumiya
Copy link
Author

      if (tid < nthreads) {
        FOR_SEND(waitSend);
        FOR_RECV(waitRecv);
        if (realSize > 0) {
          barrier();
          if (DIRECTRECV && recvDirectBuff[0]) {
            // We can only have one direct receive. Since srcs[0] == dstPtr+offset, skip one copy
            if (SEND) {
              ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, NSEND>(tid, nthreads, 1, srcs, nsend, dsts+1, realSize);
            }
          } else {
            ReduceOrCopyMulti<UNROLL, FUNC, T, RECV+SRC, RECV*NRECV+SRC, SEND+DST, SEND*NSEND+DST>(tid, nthreads, RECV*nrecv+SRC, srcs, SEND*nsend+DST, dsts, realSize);
          }
        }
        exitIfAbortBarrier(abort);
      } else {
        exitIfAbortBarrier(abort);
        FOR_SEND(postSendSize, realSize*sizeof(T));
        if (SEND) __threadfence_system();
        FOR_SEND(postSend);
        FOR_RECV(postRecv);
      }

postSend/waitSend、postRecv/postSendはそれぞれ対応している。

  inline __device__ void waitRecv(int i) {
    spins = 0;
    mismatch = 0;
    recvStep[i] += SLICESTEPS;
    if (tid == i) {
      while (*(waitPtr) < recvStep[i]) {
        if (checkAbort(recvConn[i]->opCountRem)) break;
      }
    }
  }

  inline __device__ void waitSend(int i) {
    spins = 0;
    mismatch = 0;
    sendStep[i] += SLICESTEPS;
    if (tid == WARP_SIZE+i) {
      while (sendConnHead[i] + NCCL_STEPS < sendStep[i]) {
        sendConnHead[i] = *waitPtr;
        if (checkAbort(sendConn[i]->opCountRem)) break;
      }
    }
  }

  inline __device__ void postRecv(int i) {
    *(recvConn[i]->head) = recvStep[i] += SLICESTEPS;
  }

  inline __device__ void postSend(int i) {
    *(sendConn[i]->tail) = sendStep[i] += SLICESTEPS;
  }

@rmatsumiya
Copy link
Author

クラステンプレート引数やインスタンス変数が多すぎて色々と分からない。
プリミティブの生成プロセスに着目する必要がある。

@rmatsumiya
Copy link
Author

プリミティブのコンストラクタ

  __device__ __forceinline__
  ncclPrimitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, const uint64_t opCount)
    : comm(comm), tid(tid), nthreads(nthreads), stepSize(stepSize), opCount(opCount) {
    // Make sure step is updated before we read it
    __syncthreads();

    for (int i=0; i<NRECV && recvPeers[i] >= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i, directBuff);
    for (int i=0; i<NSEND && sendPeers[i] >= 0; i++) loadSendConn(&channel->devPeers[sendPeers[i]].send.conn, i, directBuff);
  }

@rmatsumiya
Copy link
Author

プリミティブはこんな感じで、集約通信系のカーネルの最初の方で初期化されている

  ncclPrimitives<UNROLL, ALLREDUCE_CHUNKSTEPS/ALLREDUCE_SLICESTEPS, ALLREDUCE_SLICESTEPS, T, 1, 1, FUNC>
    prims(tid, nthreads, &ring->prev, &ring->next, thisOutput, stepSize, channel, comm, args->opCount);

プリミティブのテンプレート引数はこんなかんじ

// Implementation of primitive types
template <int UNROLL, int SLICESPERCHUNK, int SLICESTEPS, typename T, int NRECV, int NSEND, class FUNC>
class ncclPrimitives 

IDEによれば、FUNCはunusedなようだが……?

@rmatsumiya
Copy link
Author

↑のソースコードの対応やカーネルの中身から、Tはコピーするデータの型であることはわかる。
AllReduceTree以外はNRECVとNSENDは1

@rmatsumiya
Copy link
Author

各カーネルを呼び出すグローバル関数はcommon.hのマクロで定義されている

#define IMPL_COLL_KERN(coll, op, ncclFunc, dtype, ctype, fIndex) \
__launch_bounds__(MAXTHREADS+WARP_SIZE, 1) \
__global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclColl firstColl) { \
  int tid = threadIdx.x; \
  int bid = blockIdx.x; \
  __shared__ struct ncclColl localColl; \
 \
  struct ncclDevComm* comm = firstColl.args.comm; \
  struct ncclChannel* channel = comm->channels+bid; \
  struct ncclColl* c; \
  if (bid == 0) { \
    /* To optimize for latency, (only) the first operation is passed as argument.*/ \
    c = &firstColl; \
  } else { \
    c = &localColl; \
    load_coll(c, channel->devCollectives+channel->collFifoHead, tid); \
  } \
  while (1) { \
    if (tid < c->args.nThreads) { \
      if (c->funcIndex == fIndex) { \
        coll##Kernel<COLL_UNROLL, ncclFunc<ctype>, ctype>(&c->args); \
      } else { \
        ncclFuncs[c->funcIndex](&c->args); \
      } \
    } \
    int nextIndex = c->nextIndex; \
    if (tid == 0) channel->collFifoHead = nextIndex; \
 \
    if (c->active == 2) { \
      return; \
    } \
 \
    /* Load next collective operation*/ \
    c = &localColl; /* for bid 0 */ \
    load_coll(c, channel->devCollectives+nextIndex, tid); \
  } \
}

@rmatsumiya
Copy link
Author

Asyncの場合、関数呼び出しがされているのはncclBarrierEnqueWait() <- ncclGroupEnd()

@rmatsumiya
Copy link
Author

つまり、ncclGroupEnd()が呼び出されることによって、はじめてGPU内メモリコピーが行われる

@rmatsumiya
Copy link
Author

ncclGroupEnd()内のコメント

  /* Collectives are done in three steps :
   * 1. Barrier Check In. Only the last call may call cudaLaunchKernel[cooperative]
   * 2. Barrier Wait. No CUDA call is permitted
   * 3. Enqueue Events. CUDA event wait/enqueue.
   * This is needed because step 2 cannot call any CUDA primitive, otherwise if
   * cudaFree happens between 1 and 3, it could block that CUDA call and
   * prevent some ranks from launching their network threads, which would
   * prevent the NCCL call from completing, blocking the cudaFree call.
   */

@rmatsumiya
Copy link
Author

ncclBarrierEnque()でもグローバル関数呼び出しが発生する

  int isLast = 0;
  NCCLCHECK(ncclCpuBarrierIn(comm, &isLast));

  if (isLast) {
    if (comm->launchMode == ncclComm::GROUP) {
      // I'm the last. Launch all operations.
      NCCLCHECK(ncclLaunchCooperativeKernelMultiDevice(comm->intraParams, comm->intraCudaDevs, comm->intraRanks, *comm->intraCGMode));
    }
    NCCLCHECK(ncclCpuBarrierLast(comm));
  }

@rmatsumiya
Copy link
Author

ncclBarrierEnqueueWait()の中で通信待ち系のスレッドがresumeされる。

  NCCLCHECK(transportStartProxy(comm));

@rmatsumiya
Copy link
Author

Async環境において、ncclAllReduce()等では通信は一切発生しない。saveKernel()で通信キューにenqueされるだけ。

ncclGroupEnd()にてGPU側のコピー処理が走り、それらが全部完了するとp2pやshm、IBを使った通信が発生する。

@rmatsumiya
Copy link
Author

GPU内の集約しょりについて調査中……

#if NCCL_OP == 0 && NCCL_TYPE == 0
#define IMPL_COLL_C(collf, colln) \
  IMPL_COLL3(collf, copy, FuncSum, i8, int8_t, colln, ncclSum, ncclInt8);
#else
#define IMPL_COLL_C(collf, colln)
#endif

FuncSumはどこで定義されているんだろう。

@rmatsumiya
Copy link
Author

reduce_kernel.hに普通に定義されていた……

@rmatsumiya
Copy link
Author

rmatsumiya commented Oct 6, 2019

GPUメモリコピー(or Reduce)はこんな感じか?

  1. DSTバッファ群/SRCバッファ群(の最初のバッファ以外)のうち、先頭がアラインメントされていないものがある: 128bits単位で行うことを諦める
  2. SRCバッファの最初のバッファだけがアラインメントされていない: はみ出している部分だけ普通に行う
  3. 128b系の命令とパイプライン並列を組み合わせてReduce or memcpyを行う
    • MINSRCS, MINDSTSはループアンローリングで行うバッファの数
    • パイプの数についてはこんなコメントが乗っている
// Try to limit consecutive load/stores to 8.
// Use UNROLL 8 when we have a single source and a single destination, 4 otherwise
  1. UNROLL=8にして送りきれなかった部分→末尾のアラインメント不可能な部分の順にメモリコピーする

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant