Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CK server Code:1000 error #61

Open
oliveagle opened this issue Jan 10, 2019 · 7 comments
Open

CK server Code:1000 error #61

oliveagle opened this issue Jan 10, 2019 · 7 comments

Comments

@oliveagle
Copy link

initially, we use github.com/kshvakov/clickhouse for a little while. but we found it write data much slower than java http client (the only one listed in ck document). (snmpdump shows 10x smaller package size compare to java http client)

image

image

so we replaced it with this driver and outcome is good (similar performance as java http client). there are only minor flaws: once we start the program ck server pop out many Code: 1000 error. which never happens before. even with github.com/kshvakov/clickhouse or java client.

since db/sql provider high level abstraction. so we don't touch the code to write clickhouse while replacing the driver. we only modifed some args to setup the connection.

main code looks like this

var conn *sql.DB

var bulkChan = make(bulk, 0)

func (t *ckTable) bulkWrite(conn *sql.DB, bulk *ClickhouseBulk) error {
         tx, err := conn.Begin()
         if err != nil {
         	return err
         }

         // odd... this has to be added while using `github.com/kshvakov/clickhouse`, otherwise OOM
         // with this driver.  even comment this out, `Code:1000` remains
         defer tx.Rollback()

         stmt1, err := tx.Prepare()
         if err != nil {
          	return err
         }

         // comment this out, `Code:1000` remains
         defer stmt1.Close()

		for _, row := range bulk {
             err = stmt1.exec(row...)
             // sampled log err
         }
         return tx.Commit()
}

// worker goroutine
go func(){
    for blk := range bulkChan {
              table := tables[blk.table]
              table.bulkWrite(conn, bulk)
    }
}()

no where to handle connection anywhere in our code.... and the worker goroutine never exit

image

@bgaifullin
Copy link
Contributor

How you do create a connection? can you please share options?
how often you put data into bulkChan?
Could you please share whole code of your test or provide minimal prototype which reproduces issue

@bgaifullin
Copy link
Contributor

err = stmt1.exec(row...) - seems like here should be stmt1.Exec(row...)

@oliveagle
Copy link
Author

oliveagle commented Jan 11, 2019

type Bulk interface {
	Append(row map[string]interface{})
	Length() int
	Full() bool
	LowWaterMarkFull() bool
}

// this channel is shared globally.
var bulkChan = make(chan Bulk, 0)



// this struct implement `Bulk` interface and hold all data
// once bulk is full in collector, it will be pushed into `bulkChan`, then a `BulkWrite` will be fired.
//  two rules: 
//       1. Length() > config.BULK_SIZE (default 50,000)
//       2.  last commit time  is before now() - 10s(default) && 
//                Length() > config.LOW_WATER_MARK(default 5,000)
// one collector goroutine will only have one bulk struct at the same time, no reuse. no share.
type ClickHouseBulk struct {
	Rows  []map[string]interface{}
	DB    string
	Table string
}
func NewClickHouseBulk(db, table string) *ClickHouseBulk {
	return &ClickHouseBulk{
		DB:    db,
		Table: table,
		Rows:  make([]map[string]interface{}, 0),
	}
}



type CkServer struct {
	conn          *sql.DB
       ...
}
func NewCkServer(...) (*CkServer, error){
    // http://username:password@ip:port/database?enable_http_comression=1"
    // http://username:password@ip:port/database"
    // no more options to form a url
    db, err := sql.Open("clickhouse", url)
    if err != nil {
        return nil, err
    }
    ...
    // ~~no goroutine will be started.~~
    // return &CkServer{
    //    conn: db,
    // }, nil
    // forget about ping goroutine.
   ck := &CkServer{
        conn: db,
   }
   go ck.ping()
   return ck, nil
}

func (ck *CkServer) ping() {
     // ping this ckserver every one second with an increment backoff once failed.
    // if failed 5 times or more,  we will stop write to this ckserver.
}

func (ck *CkServer) BulkWrite(bulk *ClickHouseBulk) error {
        // ... 
	key := bulk.DB + ":" + bulk.Table
	if table, ok := ck.tables[key]; ok == true {
		return table.bulkWrite(ck.conn, bulk)
	} else {
                // we only write to a single table with in one process. this branch 
               // will only be called once in the beginning. and will fetch table 
               // desc blockingly(no goroutine).
		table, err := newCkTable(ck.conn, bulk.DB, bulk.Table, ck.logger)
		if err != nil {
			return err
		}
		ck.tables[key] = table
		return table.bulkWrite(ck.conn, bulk)
	}
}

func newCkTable(conn *sql.DB, db, table string, logger zerolog.Logger) (*ckTable, error) {
	lg := logger.With().Str("db", db).Str("table", table).Logger()

	sampled := util.GetBrustSampledLogger(lg)

	cktable := &ckTable{db: db, table: table, logger: lg, sampledLogger: sampled}

	//TODO:  what if refresh failed the first time ?
	err := cktable.refreshDescAndDDLOnce(conn)
	if err != nil {
		return nil, err
	}
	return cktable, nil
}

func (t *ckTable) bulkWrite(conn *sql.DB, bulk *ClickHouseBulk) error {
	defer logx.RecoverPrintStack()

	t.logger.Debug().Msg("bulkWrite start")

	desc := t.getDesc()
	if desc == nil {
		t.logger.Debug().Msg("table desc is nil")
		return errors.New(fmt.Sprintf("table desc is nil, db: %s, table: %s", t.db, t.table))
	}

	tx, err := conn.Begin()
	if err != nil {
		return err
	}

         // odd... this has to be added while using `github.com/kshvakov/clickhouse`, otherwise OOM
         // with this driver.  even comment this out, `Code:1000` remains
	//defer tx.Rollback() 

	stmt, err := tx.Prepare(desc.insertDDL)
	if err != nil {
		return err
	}

        // comment this out, `Code:1000` remains
	defer stmt.Close()

	for _, msg := range bulk.Rows {
		var row []interface{}
                // .... process row here.
                for _, key := range desc.keys {
                        row_desc, ok := desc.dMap[key]
			if ok == false {
				//TODO: should not happend, replace it with orderedMap.
				t.sampledLogger.Error().Msg("desc key not in desc.dMap")
				break
			}
                        if value, ok := msg[key]; ok == true && value != nil  {
                                 row = append(row, value)
                        } else {
                                 row = append(row, row_desc.Default)
                        }
                }

		if _, err := stmt.Exec(row...); err != nil {
			t.sampledLogger.Error().Err(err).Msg("stmt.Exec failed.")
		}
	}

	// commit time
	start := time.Now()
	err = tx.Commit()
	ometrics.BulkCommitTimer.UpdateSince(start)
	return err
}


// start many worker to consume `bulkCh`
for {
      select {
          case bulk := <-bulkCh:
                 err := ckSrv.BulkWrite(bulk)
                 if err != nil {
                        ckSrv.logger.Error().Err(err).Msg("bulk write failed")
                 }
      }
}

// so the path is clear
//  bulk  := <- bulkCh
//  ckServer.BulkWrite(bulk)
//  ckTable.bulkWrite(bulk)

@oliveagle
Copy link
Author

commet go ck.ping(), Code:1000 remains.

// go ck.ping()

@oliveagle
Copy link
Author

image

@chobostar
Copy link

same happens with me:

  • clickhouse 19.8.5
  • go 1.14
  • go-clickhouse 1.3.0

I use conn string like that:

http://readwrite:*******@my-standalone-ch.ru:8123/default?read_timeout=10s&write_timeout=20s
2020.04.27 17:42:00.153602 [ 274 ] {} <Trace> HTTPHandler-factory: HTTP Request for HTTPHandler-factory. Method: POST, Address: *redacted*:27906, User-Agent: Go-http-client/1.1, Length: 7944, Content Type: , Transfer Encoding: identity
2020.04.27 17:42:00.153720 [ 274 ] {} <Trace> HTTPHandler: Request URI: /?database=default&default_format=TabSeparatedWithNamesAndTypes
2020.04.27 17:42:00.153875 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> executeQuery: (from *redacted*:27906, user: readwrite) INSERT INTO *redacted*
2020.04.27 17:42:00.154050 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> executeQuery: Query pipeline:
NullAndDoCopy
 InputStreamFromASTInsertQuery

2020.04.27 17:42:00.154513 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Information> executeQuery: Read 19 rows, 7.50 KiB in 0.001 sec., 31167 rows/sec., 12.02 MiB/sec.
2020.04.27 17:42:00.154556 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> MemoryTracker: Peak memory usage (total): 115.66 KiB.
2020.04.27 17:42:00.154615 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> MemoryTracker: Peak memory usage (for query): 2.12 MiB.
2020.04.27 17:42:00.154623 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Information> HTTPHandler: Done processing query
2020.04.27 17:42:00.154938 [ 274 ] {} <Error> ServerErrorHandler: Poco::Exception. Code: 1000, e.code() = 104, e.displayText() = Connection reset by peer (version 19.8.5)

@DoubleDi
Copy link
Collaborator

Hi @chobostar can you check if the problem stays on a newer version 1.5.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants