Username : csci5570

Password : huskydatalab

husky是一个通用的分布式的计算平台,就像mapreduce、spark这种,它是用c++写的(难受…)

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# Required
master_host=master#master跑的地方
master_port=10086#master绑定的端口
comm_port=12306#worker绑定的端口

# Worker information
[worker]
info=worker1:4#worker1有4个线程
info=worker2:4#worker2有4个线程

#如果用了hdfs,配置hdfs路径
hdfs_namenode=master
hdfs_namenode_port=9000

运行的时候用

1
./program --conf=/path/to/config.ini

写入配置。

组件

Object List

Object List(objList)是husky中最主要的对象,可以把任何对象都存在objlist中,两个objlist通过channel传递消息。

1
2
3
4
5
6
7
8
class Obj {
public:
using KeyT = int;
KeyT key;
const KeyT& id() const { return key; }
Obj() = default;
explicit Obj(const KeyT& k) : key(k) {}
};

创建一个obj只要3步:

  1. 定义一个key的类型(keyT),一般都用int
  2. 写一个id()函数来返回该对象对应的key
  3. 需要一个默认构造函数,还需要一个能够接收key参数的构造函数

接下来就可以创建、使用、删除Object List

1
2
3
4
5
6
7
8
9
10
11
12
//创建名叫my_objlist的objlist
auto& objlist = ObjListStore::create_objlist<Obj>("my_objlist");

//将obj传入创建好的objlist中
Obj obj(3);
objlist.add_object(obj);

//通过名字拿到对应的objlist,注意这里的auto关键字,自动判定类型,很舒服!
auto& objlist2 = ObjListStore::get_objlist<Obj>("my_objlist");

//通过名字删除objlist
ObjListStore::drop_objlist("my_objlist");

为了让添加在objlist中的obj被其他线程感知并利用,在多线程情况下需要将objlist全局化一下,husky已经封装好该方法

1
globalize(objlist);

接下来就是最重要的一个函数list_execute(),它规定了list里的每个object需要做的事,这个函数是用户自己定义的。它有两个参数:

  • 第一个是要操作的objlist
  • 第二个是这个objlist中每个obj要做的事,例如下面函数就是obj在log中打印id,包括之后用channel发送或接收消息也都是在这个函数里
1
2
3
list_execute(objlist, [](Obj& obj) {
base::log_msg("My id is: " + obj.id());
});

Channel

channel就是object和object互相通信的工具,他们的关系类似于城市和公路。husky中有四种channel:

  • Push Channel:最常见的点对点通信
  • Push Combined Channel:在push channel基础上增加了合并发给同个obj的
  • Broadcast Channel:将一个key-value广播出去,任何地方都可以通过key拿到值
  • Migrate Channel:用来migrate对象,将一个对象发送到另一个线程上

channel的创建、使用和drop(一定要主动销毁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// create PushChannel
template <typename MsgT, typename DstObjT>
static PushChannel<MsgT, DstObjT>&
create_push_channel(ChannelSource& src_list,
ObjList<DstObjT>& dst_list,
const std::string& name = "");

// Get PushChannel through name
template <typename MsgT, typename DstObjT>
static PushChannel<MsgT, DstObjT>&
get_push_channel(const std::string& name = "");

// Drop channel through name
static void drop_channel(const std::string& name);

下面通过例子来说明:

首先,要想创建channel,就要确定发消息的源objlist和目的objlist,当然,参数里的目的objlist必须是全局化的

1
2
//创建一个push_channel
auto& ch = ChannelStore::create_push_channel<int>(src_list, dst_list);

一般来说,channel是放在list_execute()函数里用的,要想清楚从哪个obj发,发什么,哪个obj接收(通过key来标注)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//push channel
//发送端代码
list_execute(src_list, [&ch](Obj& obj) {
ch.push(msg, key); // send msg to key
});

//接收端代码
list_execute(dst_list, [&ch](Obj& obj) {
auto& msgs = ch.get(obj); // The msgs is of type std::vector<MsgT>, MsgT is int in this case
});

//broadcast channel
auto& ch4 = ChannelStore::create_broadcast_channel<int, std::string>(src_list);
list_execute(src_list, [&ch4](Obj& obj) {
ch4.broadcast(key, value); // broadcast key, value pair
});
list_execute(src_list, [&ch4](Obj& obj) {
auto msg = ch4.get(key); // get the broadcasted value through key.
});

###Aggregator

用来执行一些聚合操作的类,可以用来做求前k大值,统计数量,计算机器学习梯度总数等。他的构造函数需要两个参数(或以上),一个是init值,另外是lambda函数

1
Aggregator<int> agg(0, [](int& a, const int& b){ a += b; });

这个lambda函数就是aggregate的规则。

在创建完agregator后,就要使用它了。可以用update函数或者update_any函数(比update可接受参数类型多)来进行aggregator,例如

1
agg.update(1);//aggregator值加1

在聚合完之后,更新的值其实只在本地,为了让这个值在全局响应要用HuskyAggregatorFactory::sync()函数。另一种方式是通过HuskyAggregatorFactory::get_channel()来拿到通道,然后在list_execute中通过这个channel把消息传播出去,这种方法最后也会去调用sync()函数

1
2
3
4
5
6
7
8
//两种方式
AggregatorFactory::sync();

// or using aggregator channel
auto& ac = AggregatorFactory::get_channel();
list_execute(obj_list, {}, {&ac}, [&](OBJ& obj) {
... // here we can give updates to some aggregators
});

当全局划这个聚合之后,就可以用get_value()函数得到值了

1
int sum = agg.get_value()

这个值是被全局共享的,所以对他的修改会影响其他executor,并可能有线程安全问题