Gradle

코로나바이러스가 한창이다. 영화 컨테이젼을 보다가 gradle의 뜻을 깨달았다. 영화 마지막에 U2 노래가 흐르더라. ... from the cradle to the grave~

쿠두 insert operation을 이용한 update operation object 생성

아파치 플룸(apache flume)의 기능을 확장시키는 플러그인을 개발하고 있다.
kudu에 대량의 데이터를 insert해야 하는데 primary key가 중복된 데이터가 있다. 다행히 kudu에는 upsert가 있어 중복 데이터 여부 관계없이 데이터를 밀어넣을 수가 있다. 이런 경우에는 primary key가 같다면 마지막에 upsert한 데이터만 남을 것이다.
만약, upsert 대신 insert를 사용한다면 처음 insert한 데이터만 남을 것이다.
만약, primary key 중복 오류로 insert에 실패한 행의 수를 센 후 update해야 한다면 다음의 함수를 사용한다. 찾아봐도 없어서 만들었다.

    /**
     * 중복행 오류로 Insert에 실패한 Operation을 update시키기 위해 row copy
     * @param insertOperation the operation that failed insert
     * @return update operation
     * @throws FlumeException
     */
    private Operation getUpdateOperationFrom(Operation insertOperation) throws FlumeException {
        try {

            PartialRow rowInsert = insertOperation.getRow();
            Schema schema = table.getSchema();
            int schemaColumnCount = schema.getColumnCount();
            List columns = schema.getColumns();
            Operation operation = table.newUpdate();
            PartialRow rowUpdate = operation.getRow();

            for (int i = 0; i < schemaColumnCount; i++)
            {
                if (rowInsert.isNull(i))
                    rowUpdate.setNull(i);
                else
                {
                    switch (columns.get(i).getType()) {
                        case INT8:
                            rowUpdate.addByte(i, rowInsert.getByte(i));
                            break;
                        case INT16:
                            rowUpdate.addShort(i, rowInsert.getShort(i));
                            break;
                        case INT32:
                            rowUpdate.addInt(i, rowInsert.getInt(i));
                            break;
                        case INT64:
                            rowUpdate.addLong(i, rowInsert.getInt(i));
                            break;
                        case BINARY:
                            rowUpdate.addBinary(i, rowInsert.getBinaryCopy(i));
                            break;
                        case STRING:
                            rowUpdate.addString(i, rowInsert.getString(i));
                            break;
                        case BOOL:
                            rowUpdate.addBoolean(i, rowInsert.getBoolean(i));
                            break;
                        case FLOAT:
                            rowUpdate.addFloat(i, rowInsert.getFloat(i));
                            break;
                        case DOUBLE:
                            rowUpdate.addDouble(i, rowInsert.getDouble(i));
                            break;
                        case UNIXTIME_MICROS:
                            rowUpdate.addLong(i, rowInsert.getLong(i));
                            break;
                        default:
                            logger.warn("got unknown type {} for column '{}' during produce update operation -- ignoring this column",
                                    columns.get(i).getType(), columns.get(i).getName());
                    }
                }
            }
            return operation;

        } catch (Exception e) {
            throw new FlumeException("Failed to create Kudu Upsert object", e);
        }
    }

kudu에서 bluk insert에 실패한 operation을 이용하여 update operation을 생성할 때 쓴다.

이 블로그의 인기 게시물

JavaScript의 Math.random() 대체

아마존 오더블 오디오북을 mp3로 저장하기