diff --git a/experimental/init_from_json/main.py b/experimental/init_from_json/main.py new file mode 100644 index 00000000..dcac601f --- /dev/null +++ b/experimental/init_from_json/main.py @@ -0,0 +1,39 @@ +import argparse + +import tensorflow as tf +from kungfu.python import current_cluster_size, current_rank, init_from_config +from kungfu.tensorflow.ops import all_reduce + + +def parse_args(): + p = argparse.ArgumentParser() + p.add_argument('--rank', type=int, default=0) + return p.parse_args() + + +def main(): + args = parse_args() + config = { + 'cluster': { + 'peers': [ + '127.0.0.1:10010', + '127.0.0.1:10011', + ], + }, + 'self': { + 'rank': args.rank, + }, + } + + init_from_config(config) + + rank = current_rank() + size = current_cluster_size() + print('%d/%d' % (rank, size)) + x = tf.Variable(1 + int(rank), dtype=tf.int32) + y = all_reduce(x) + print(x, y) + print('done') + + +main() diff --git a/experimental/init_from_json/run.sh b/experimental/init_from_json/run.sh new file mode 100755 index 00000000..922c64a5 --- /dev/null +++ b/experimental/init_from_json/run.sh @@ -0,0 +1,13 @@ +#!/bin/sh +set -e + +cd $(dirname $0) + +export KUNGFU_DISABLE_AUTO_LOAD=1 + +python3 main.py --rank 0 & +python3 main.py --rank 1 & + +wait + +echo done diff --git a/srcs/cpp/include/kungfu/peer.hpp b/srcs/cpp/include/kungfu/peer.hpp index 8c741599..1b45570e 100644 --- a/srcs/cpp/include/kungfu/peer.hpp +++ b/srcs/cpp/include/kungfu/peer.hpp @@ -15,11 +15,13 @@ using TransformFunc = std::function &GetDefault(bool reinit = false); + + static std::unique_ptr &GetDefault(bool reinit = false); Peer(); Peer(int rank, int size); // Single Machine Multi-Process + Peer(const char *pJson); // init from JSON ~Peer(); diff --git a/srcs/cpp/include/kungfu/python/c_api.h b/srcs/cpp/include/kungfu/python/c_api.h index 9cc620a1..d8a3d12f 100644 --- a/srcs/cpp/include/kungfu/python/c_api.h +++ b/srcs/cpp/include/kungfu/python/c_api.h @@ -7,6 +7,7 @@ extern "C" { extern void kungfu_python_init(); extern void kungfu_python_init_single_machine(int rank, int size); +extern void kungfu_python_init_from_json(const char *pJson); extern void kungfu_python_init_nccl(); extern void kungfu_python_finialize(); diff --git a/srcs/cpp/src/peer.cpp b/srcs/cpp/src/peer.cpp index 98e9539c..b2325db2 100644 --- a/srcs/cpp/src/peer.cpp +++ b/srcs/cpp/src/peer.cpp @@ -28,6 +28,15 @@ Peer::Peer(int rank, int size) } } +Peer::Peer(const char *pJson) +{ + const int err = GoKungfuInitFromJSON(const_cast(pJson)); + if (err) { + fprintf(stderr, "%s failed\n", "GoKungfuInitFromJSON"); + exit(1); + } +} + Peer::~Peer() { GoKungfuFinalize(); } bool Peer::Detached() const { return GoKungfuDetached(); } diff --git a/srcs/cpp/src/python/init.cpp b/srcs/cpp/src/python/init.cpp index 7e8894e9..f77fc98b 100644 --- a/srcs/cpp/src/python/init.cpp +++ b/srcs/cpp/src/python/init.cpp @@ -38,6 +38,12 @@ void kungfu_python_init_single_machine(int rank, int size) _init_affinity(); } +void kungfu_python_init_from_json(const char *pJson) +{ + kungfu::Peer::GetDefault().reset(new kungfu::Peer(pJson)); + _init_affinity(); +} + void kungfu_python_finialize() { kungfu::Peer::GetDefault().reset(nullptr); } uint64_t kungfu_uid() { return kungfu::Peer::GetDefault()->Uid(); } diff --git a/srcs/go/kungfu/env/config_json.go b/srcs/go/kungfu/env/config_json.go new file mode 100644 index 00000000..6a4eb4c4 --- /dev/null +++ b/srcs/go/kungfu/env/config_json.go @@ -0,0 +1,60 @@ +package env + +import ( + "encoding/json" + "net" + "strconv" + + "github.com/lsds/KungFu/srcs/go/kungfu/base" + "github.com/lsds/KungFu/srcs/go/plan" +) + +type peerID plan.PeerID // customized JSON encoding + +func (p peerID) MarshalJSON() ([]byte, error) { + port := strconv.Itoa(int(p.Port)) + addr := net.JoinHostPort(plan.FormatIPv4(p.IPv4), port) + return json.Marshal(addr) +} + +func (p *peerID) UnmarshalJSON(bs []byte) error { + var s string + if err := json.Unmarshal(bs, &s); err != nil { + return err + } + id, err := plan.ParsePeerID(s) + if err != nil { + return err + } + *p = peerID(*id) + return nil +} + +type clusterSpec struct { + Peers []peerID `json:"peers"` +} + +type peerSpec struct { + Rank int `json:"rank"` +} + +type kungfuConfig struct { + Cluster clusterSpec `json:"cluster"` + Self peerSpec `json:"self"` +} + +func ParseConfigFromJSON(js string) (*Config, error) { + var kfConfig kungfuConfig + if err := json.Unmarshal([]byte(js), &kfConfig); err != nil { + return nil, err + } + var initPeers plan.PeerList + for _, p := range kfConfig.Cluster.Peers { + initPeers = append(initPeers, plan.PeerID(p)) + } + return &Config{ + InitPeers: initPeers, + Self: initPeers[kfConfig.Self.Rank], + Strategy: base.DefaultStrategy, + }, nil +} diff --git a/srcs/go/kungfu/env/config_json_test.go b/srcs/go/kungfu/env/config_json_test.go new file mode 100644 index 00000000..13a09aeb --- /dev/null +++ b/srcs/go/kungfu/env/config_json_test.go @@ -0,0 +1 @@ +package env diff --git a/srcs/go/libkungfu-comm/main.go b/srcs/go/libkungfu-comm/main.go index b08e02d7..101a6672 100644 --- a/srcs/go/libkungfu-comm/main.go +++ b/srcs/go/libkungfu-comm/main.go @@ -33,6 +33,21 @@ func GoKungfuInit() int { return errorCode("Start", defaultPeer.Start()) } +//export GoKungfuInitFromJSON +func GoKungfuInitFromJSON(pJSON *C.char) int { + js := C.GoString(pJSON) + log.Errorf("GoKungfuInitFromJSON: %s", js) + cfg, err := env.ParseConfigFromJSON(js) + if err != nil { + errorCode("ParseConfigFromJSON", err) + } + defaultPeer, err = peer.NewFromConfig(cfg) + if err != nil { + return errorCode("NewFromConfig", err) + } + return errorCode("Start", defaultPeer.Start()) +} + //export GoKungfuInitSingleMachine func GoKungfuInitSingleMachine(rank, size C.int) int { cfg, err := env.SingleMachineEnv(int(rank), int(size)) diff --git a/srcs/python/kungfu/python/__init__.py b/srcs/python/kungfu/python/__init__.py index 507f9d9e..e65918e3 100644 --- a/srcs/python/kungfu/python/__init__.py +++ b/srcs/python/kungfu/python/__init__.py @@ -1,5 +1,7 @@ import atexit +import json import os +from ctypes import byref, c_char, c_char_p, c_int from kungfu.loader import (_call_method, _call_method_with, _load_clib, _module_path) @@ -23,9 +25,26 @@ def _load_and_init_python_lib(): return _python_lib, has_nccl -_python_lib = None -_has_nccl = None -_python_lib, _has_nccl = _load_and_init_python_lib() +def _load_python_lib(): + _load_clib('libkungfu') + _python_lib = _load_clib('libkungfu_python') + return _python_lib + + +if os.getenv('KUNGFU_DISABLE_AUTO_LOAD'): + _python_lib = _load_python_lib() + _has_nccl = None +else: + _python_lib, _has_nccl = _load_and_init_python_lib() + + +def init_from_config(config): + global _has_nccl + js = json.dumps(config) + # https://stackoverflow.com/questions/61294630/ctypes-passing-a-·string-as-a-pointer-from-python-to-c + _call_method_with(_python_lib, 'kungfu_python_init_from_json', + c_char_p(js.encode())) + _has_nccl = _call_method(_python_lib, 'kungfu_python_init_nccl') def _init_single_machine_multiple_process(rank, size): @@ -129,8 +148,6 @@ def show_nccl_version(): # unstable APIs -from ctypes import byref, c_int, c_char - def _resize_from_url(): changed = c_char()